diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java index 7b81885da7437..f436e09f58c5a 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/AsciiLongArrayView.java @@ -129,10 +129,10 @@ public long element() { } } - public double elementAsDouble() { - String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII); - return Double.valueOf(token); - } +// public double elementAsDouble() { +// String token = new String(buffer, tokenOffset, tokenNumBytes, Charsets.US_ASCII); +// return Double.valueOf(token); +// } @Override diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java index d3728aecf2890..60c5191f24fc1 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/CompensatableDanglingPageRank.java @@ -111,7 +111,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { // --------------- the inputs --------------------- // page rank input - JobInputVertex pageWithRankInput = JobGraphUtils.createInput(DanglingPageGenerateRankInputFormat.class, + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(ImprovedDanglingPageRankInputFormat.class, pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); @@ -120,7 +120,7 @@ public static JobGraph getJobGraph(String[] args) throws Exception { pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); // edges as adjacency list - JobInputVertex adjacencyListInput = JobGraphUtils.createInput(AdjacencyListInputFormat.class, + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(ImprovedAdjacencyListInputFormat.class, adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java new file mode 100644 index 0000000000000..cc5f2a954c18b --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDanglingPageRank.java @@ -0,0 +1,305 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.nephele.configuration.GlobalConfiguration; +import eu.stratosphere.nephele.io.DistributionPattern; +import eu.stratosphere.nephele.io.channels.ChannelType; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.nephele.jobgraph.JobInputVertex; +import eu.stratosphere.nephele.jobgraph.JobOutputVertex; +import eu.stratosphere.nephele.jobgraph.JobTaskVertex; +import eu.stratosphere.pact.common.io.FileOutputFormat; +import eu.stratosphere.pact.generic.types.TypeComparatorFactory; +import eu.stratosphere.pact.generic.types.TypePairComparatorFactory; +import eu.stratosphere.pact.generic.types.TypeSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.DiffL1NormConvergenceCriterion; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyListComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyListSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDanglingComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDanglingSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankSerializerFactory; +import eu.stratosphere.pact.runtime.iterative.playing.JobGraphUtils; +import eu.stratosphere.pact.runtime.iterative.playing.PlayConstants; +import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask; +import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask; +import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask; +import eu.stratosphere.pact.runtime.shipping.ShipStrategyType; +import eu.stratosphere.pact.runtime.task.BuildSecondCachedMatchDriver; +import eu.stratosphere.pact.runtime.task.CoGroupDriver; +import eu.stratosphere.pact.runtime.task.DriverStrategy; +import eu.stratosphere.pact.runtime.task.MapDriver; +import eu.stratosphere.pact.runtime.task.util.LocalStrategy; +import eu.stratosphere.pact.runtime.task.util.TaskConfig; + +public class CustomCompensatableDanglingPageRank { + + private static final int NUM_FILE_HANDLES_PER_SORT = 64; + + private static final float SORT_SPILL_THRESHOLD = 0.85f; + + + private static TypeSerializerFactory vertexWithRankSerializer = new VertexWithRankSerializerFactory(); + + private static TypeSerializerFactory vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory(); + + private static TypeSerializerFactory vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory(); + + private static TypeComparatorFactory vertexWithRankComparator = new VertexWithRankComparatorFactory(); + + private static TypeComparatorFactory vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory(); + + private static TypeComparatorFactory vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory(); + + private static TypePairComparatorFactory matchComparator = + new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory(); + + private static TypePairComparatorFactory coGroupComparator = + new VertexWithRankDanglingToVertexWithRankPairComparatorFactory(); + + + public static void main(String[] args) throws Exception { + String confPath = args.length >= 6 ? confPath = args[5] : PlayConstants.PLAY_DIR + "local-conf"; + + GlobalConfiguration.loadConfiguration(confPath); + Configuration conf = GlobalConfiguration.getConfiguration(); + + JobGraph jobGraph = getJobGraph(args); + JobGraphUtils.submit(jobGraph, conf); + } + + public static JobGraph getJobGraph(String[] args) throws Exception { + + int degreeOfParallelism = 2; + int numSubTasksPerInstance = degreeOfParallelism; + String pageWithRankInputPath = "file://" + PlayConstants.PLAY_DIR + "test-inputs/danglingpagerank/pageWithRank"; + String adjacencyListInputPath = "file://" + PlayConstants.PLAY_DIR + + "test-inputs/danglingpagerank/adjacencylists"; + String outputPath = "file:///tmp/stratosphere/iterations"; +// String confPath = PlayConstants.PLAY_DIR + "local-conf"; + int minorConsumer = 25; + int matchMemory = 50; + int coGroupSortMemory = 50; + int numIterations = 25; + long numVertices = 5; + long numDanglingVertices = 1; + + String failingWorkers = "1"; + int failingIteration = 2; + double messageLoss = 0.75; + + if (args.length >= 15) { + degreeOfParallelism = Integer.parseInt(args[0]); + numSubTasksPerInstance = Integer.parseInt(args[1]); + pageWithRankInputPath = args[2]; + adjacencyListInputPath = args[3]; + outputPath = args[4]; +// confPath = args[5]; + minorConsumer = Integer.parseInt(args[6]); + matchMemory = Integer.parseInt(args[7]); + coGroupSortMemory = Integer.parseInt(args[8]); + numIterations = Integer.parseInt(args[9]); + numVertices = Long.parseLong(args[10]); + numDanglingVertices = Long.parseLong(args[11]); + failingWorkers = args[12]; + failingIteration = Integer.parseInt(args[13]); + messageLoss = Double.parseDouble(args[14]); + } + + JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank"); + + // --------------- the inputs --------------------- + + // page rank input + JobInputVertex pageWithRankInput = JobGraphUtils.createInput(CustomImprovedDanglingPageRankInputFormat.class, + pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); + TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration()); + pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0); + pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + + // edges as adjacency list + JobInputVertex adjacencyListInput = JobGraphUtils.createInput(CustomImprovedAdjacencyListInputFormat.class, + adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance); + TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration()); + adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer); + adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0); + + // --------------- the head --------------------- + JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, + degreeOfParallelism, numSubTasksPerInstance); + TaskConfig headConfig = new TaskConfig(head.getConfiguration()); + + // initial input / partial solution + headConfig.addInputToGroup(0); + headConfig.setIterationHeadPartialSolutionInputIndex(0); + headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0); + headConfig.setInputLocalStrategy(0, LocalStrategy.SORT); + headConfig.setMemoryInput(0, minorConsumer * JobGraphUtils.MEGABYTE); + headConfig.setFilehandlesInput(0, NUM_FILE_HANDLES_PER_SORT); + headConfig.setSpillingThresholdInput(0, SORT_SPILL_THRESHOLD); + + // back channel / iterations + headConfig.setBackChannelMemory(minorConsumer * JobGraphUtils.MEGABYTE); + + // output into iteration + headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + + // final output + TaskConfig headFinalOutConfig = new TaskConfig(new Configuration()); + headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig); + + // the sync + headConfig.setIterationHeadIndexOfSyncOutput(3); + headConfig.setNumberOfIterations(numIterations); + + // the driver + headConfig.setDriver(MapDriver.class); + headConfig.setDriverStrategy(DriverStrategy.MAP); + headConfig.setStubClass(CustomCompensatingMap.class); + headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + headConfig.setStubParameter("compensation.failingWorker", failingWorkers); + headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration)); + headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss)); + + // --------------- the join --------------------- + + JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, + "IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance); + TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration()); +// intermediateConfig.setDriver(RepeatableHashjoinMatchDriverWithCachedBuildside.class); + intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class); + intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND); + intermediateConfig.setMemoryDriver(matchMemory * JobGraphUtils.MEGABYTE); + intermediateConfig.addInputToGroup(0); + intermediateConfig.addInputToGroup(1); + intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1); + intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0); + intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1); + intermediateConfig.setDriverPairComparator(matchComparator); + + intermediateConfig.setOutputSerializer(vertexWithRankSerializer); + intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH); + intermediateConfig.setOutputComparator(vertexWithRankComparator, 0); + + intermediateConfig.setStubClass(CustomCompensatableDotProductMatch.class); + intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers); + intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration)); + intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss)); + + // ---------------- the tail (co group) -------------------- + + JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, + degreeOfParallelism, numSubTasksPerInstance); + TaskConfig tailConfig = new TaskConfig(tail.getConfiguration()); + // TODO we need to combine! + + // inputs and driver + tailConfig.setDriver(CoGroupDriver.class); + tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP); + tailConfig.addInputToGroup(0); + tailConfig.addInputToGroup(1); + tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + tailConfig.setInputSerializer(vertexWithRankSerializer, 1); + tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0); + tailConfig.setDriverComparator(vertexWithRankComparator, 1); + tailConfig.setDriverPairComparator(coGroupComparator); + tailConfig.setInputAsynchronouslyMaterialized(0, true); + tailConfig.setInputMaterializationMemory(0, minorConsumer * JobGraphUtils.MEGABYTE); + tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT); + tailConfig.setInputComparator(vertexWithRankComparator, 1); + tailConfig.setMemoryInput(1, coGroupSortMemory * JobGraphUtils.MEGABYTE); + tailConfig.setFilehandlesInput(1, NUM_FILE_HANDLES_PER_SORT); + tailConfig.setSpillingThresholdInput(1, SORT_SPILL_THRESHOLD); + + // output + tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD); + tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer); + + // the stub + tailConfig.setStubClass(CustomCompensatableDotProductCoGroup.class); + tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices)); + tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices)); + tailConfig.setStubParameter("compensation.failingWorker", failingWorkers); + tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration)); + tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss)); + + // --------------- the output --------------------- + + JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism, + numSubTasksPerInstance); + TaskConfig outputConfig = new TaskConfig(output.getConfiguration()); + outputConfig.addInputToGroup(0); + outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0); + outputConfig.setStubClass(CustomPageWithRankOutFormat.class); + outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath); + + // --------------- the auxiliaries --------------------- + + JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", + degreeOfParallelism, numSubTasksPerInstance); + + JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism); + TaskConfig syncConfig = new TaskConfig(sync.getConfiguration()); + syncConfig.setNumberOfIterations(numIterations); + syncConfig.setConvergenceCriterion(DiffL1NormConvergenceCriterion.class); + + // --------------- the wiring --------------------- + + JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + + JobGraphUtils.connect(head, intermediate, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1); + + JobGraphUtils.connect(adjacencyListInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + + JobGraphUtils.connect(head, tail, ChannelType.NETWORK, DistributionPattern.POINTWISE); + JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.BIPARTITE); + tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1); + tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism); + + JobGraphUtils.connect(head, output, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE); + + JobGraphUtils.connect(head, sync, ChannelType.NETWORK, DistributionPattern.POINTWISE); + + fakeTailOutput.setVertexToShareInstancesWith(tail); + tail.setVertexToShareInstancesWith(head); + pageWithRankInput.setVertexToShareInstancesWith(head); + adjacencyListInput.setVertexToShareInstancesWith(head); + intermediate.setVertexToShareInstancesWith(head); + output.setVertexToShareInstancesWith(head); + sync.setVertexToShareInstancesWith(head); + + return jobGraph; + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java new file mode 100644 index 0000000000000..b500d321a62bf --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductCoGroup.java @@ -0,0 +1,110 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.generic.stub.AbstractStub; +import eu.stratosphere.pact.generic.stub.GenericCoGrouper; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.DiffL1NormConvergenceCriterion; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStats; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStatsAggregator; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; +import eu.stratosphere.pact.runtime.iterative.concurrent.IterationContext; + +import java.util.Iterator; +import java.util.Set; + +public class CustomCompensatableDotProductCoGroup extends AbstractStub implements GenericCoGrouper { + + private VertexWithRankAndDangling accumulator = new VertexWithRankAndDangling(); + + private PageRankStatsAggregator aggregator = + (PageRankStatsAggregator) new DiffL1NormConvergenceCriterion().createAggregator(); + + private long numVertices; + + private long numDanglingVertices; + + private double dampingFactor; + + private double danglingRankFactor; + + private static final double BETA = 0.85; + + private int workerIndex; + + private int currentIteration; + + private int failingIteration; + + private Set failingWorkers; + + @Override + public void open(Configuration parameters) throws Exception { + workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); + failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); + failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); + + numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters); + numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters); + + aggregator.reset(); + + dampingFactor = (1d - BETA) / (double) numVertices; + + if (currentIteration == 1) { + danglingRankFactor = BETA * (double) numDanglingVertices / ((double) numVertices * (double) numVertices); + } else { + PageRankStats previousAggregate = (PageRankStats) IterationContext.instance().getGlobalAggregate( + workerIndex); + danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices; + } + } + + @Override + public void coGroup(Iterator currentPageRankIterator, Iterator partialRanks, + Collector collector) + { + if (!currentPageRankIterator.hasNext()) { + long missingVertex = partialRanks.next().getVertexID(); + throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!"); + } + + VertexWithRankAndDangling currentPageRank = currentPageRankIterator.next(); + + long edges = 0; + double summedRank = 0; + while (partialRanks.hasNext()) { + summedRank += partialRanks.next().getRank(); + edges++; + } + + double rank = BETA * summedRank + dampingFactor + danglingRankFactor; + + double currentRank = currentPageRank.getRank(); + boolean isDangling = currentPageRank.isDangling(); + + double danglingRankToAggregate = isDangling ? rank : 0; + long danglingVerticesToAggregate = isDangling ? 1 : 0; + + double diff = Math.abs(currentRank - rank); + + aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges, summedRank, 0); + + accumulator.setVertexID(currentPageRank.getVertexID()); + accumulator.setRank(rank); + accumulator.setDangling(isDangling); + + collector.collect(accumulator); + } + + @Override + public void close() throws Exception { + if (currentIteration == failingIteration && failingWorkers.contains(workerIndex)) { + aggregator.reset(); + } + IterationContext.instance().setAggregate(workerIndex, aggregator.getAggregate()); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java new file mode 100644 index 0000000000000..e7d022e32710c --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatableDotProductMatch.java @@ -0,0 +1,73 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.generic.stub.AbstractStub; +import eu.stratosphere.pact.generic.stub.GenericMatcher; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRank; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; + +import java.util.Random; +import java.util.Set; + +public class CustomCompensatableDotProductMatch extends AbstractStub implements + GenericMatcher { + + private VertexWithRank record = new VertexWithRank(); + + private Random random = new Random(); + + private double messageLoss; + + private boolean isFailure; + + @Override + public void open(Configuration parameters) throws Exception { + int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); + int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); + Set failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); + isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex); + messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters); + } + + @Override + public void match(VertexWithRankAndDangling pageWithRank, VertexWithAdjacencyList adjacencyList, Collector collector) + throws Exception + { + double rank = pageWithRank.getRank(); + long[] adjacentNeighbors = adjacencyList.getTargets(); + int numNeighbors = adjacencyList.getNumTargets(); + + double rankToDistribute = rank / (double) numNeighbors; + record.setRank(rankToDistribute); + + for (int n = 0; n < numNeighbors; n++) { + record.setVertexID(adjacentNeighbors[n]); + if (isFailure) { + if (random.nextDouble() >= messageLoss) { + collector.collect(record); + } + } else { + collector.collect(record); + } + } + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java new file mode 100644 index 0000000000000..ccf8a772dcb48 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomCompensatingMap.java @@ -0,0 +1,61 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.common.stubs.Collector; +import eu.stratosphere.pact.generic.stub.AbstractStub; +import eu.stratosphere.pact.generic.stub.GenericMapper; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.PageRankStats; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; +import eu.stratosphere.pact.runtime.iterative.concurrent.IterationContext; + +import java.util.Set; + +public class CustomCompensatingMap extends AbstractStub implements GenericMapper { + + private boolean isFailureIteration; + + private boolean isFailingWorker; + + private double uniformRank; + + private double rescaleFactor; + + @Override + public void open(Configuration parameters) throws Exception { + + + int currentIteration = ConfigUtils.asInteger("pact.iterations.currentIteration", parameters); + int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters); + isFailureIteration = currentIteration == failingIteration + 1; + + int workerIndex = ConfigUtils.asInteger("pact.parallel.task.id", parameters); + Set failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters); + isFailingWorker = failingWorkers.contains(workerIndex); + + long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters); + + if (currentIteration > 1) { + PageRankStats stats = (PageRankStats) IterationContext.instance().getGlobalAggregate(workerIndex); + + uniformRank = 1d / (double) numVertices; + double lostMassFactor = (numVertices - stats.numVertices()) / (double) numVertices; + rescaleFactor = (1 - lostMassFactor) / stats.rank(); + } + } + + @Override + public void map(VertexWithRankAndDangling pageWithRank, Collector out) throws Exception { + + if (isFailureIteration) { + double rank = pageWithRank.getRank(); + + if (isFailingWorker) { + pageWithRank.setRank(uniformRank); + } else { + pageWithRank.setRank(rank * rescaleFactor); + } + } + out.collect(pageWithRank); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java new file mode 100644 index 0000000000000..ed4ec2c4c6b20 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedAdjacencyListInputFormat.java @@ -0,0 +1,46 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.pact.generic.io.DelimitedInputFormat; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.AsciiLongArrayView; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithAdjacencyList; + +public class CustomImprovedAdjacencyListInputFormat extends DelimitedInputFormat { + + private final AsciiLongArrayView arrayView = new AsciiLongArrayView(); + + @Override + public boolean readRecord(VertexWithAdjacencyList target, byte[] bytes, int offset, int numBytes) { + + if (numBytes == 0) { + return false; + } + + arrayView.set(bytes, offset, numBytes); + + long[] list = target.getTargets(); + + try { + + int pos = 0; + while (arrayView.next()) { + + if (pos == 0) { + target.setVertexID(arrayView.element()); + } else { + if (list.length <= pos - 1) { + list = new long[list.length < 16 ? 16 : list.length * 2]; + target.setTargets(list); + } + list[pos - 1] = arrayView.element(); + } + pos++; + } + + target.setNumTargets(pos - 1); + } catch (RuntimeException e) { + throw new RuntimeException("Error parsing: " + arrayView.toString(), e); + } + + return true; + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java new file mode 100644 index 0000000000000..d9cf6e409c849 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomImprovedDanglingPageRankInputFormat.java @@ -0,0 +1,47 @@ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import eu.stratosphere.nephele.configuration.Configuration; +import eu.stratosphere.pact.generic.io.DelimitedInputFormat; +import eu.stratosphere.pact.runtime.iterative.compensatable.ConfigUtils; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.AsciiLongArrayView; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; + +public class CustomImprovedDanglingPageRankInputFormat extends DelimitedInputFormat { + + private AsciiLongArrayView arrayView = new AsciiLongArrayView(); + + private static final long DANGLING_MARKER = 1l; + + private double initialRank; + + @Override + public void configure(Configuration parameters) { + long numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters); + initialRank = 1.0 / numVertices; + super.configure(parameters); + } + + @Override + public boolean readRecord(VertexWithRankAndDangling target, byte[] bytes, int offset, int numBytes) { + + arrayView.set(bytes, offset, numBytes); + + try { + arrayView.next(); + target.setVertexID(arrayView.element()); + + if (arrayView.next()) { + target.setDangling(arrayView.element() == DANGLING_MARKER); + } else { + target.setDangling(false); + } + + } catch (NumberFormatException e) { + throw new RuntimeException("Error parsing " + arrayView.toString(), e); + } + + target.setRank(initialRank); + + return true; + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java new file mode 100644 index 0000000000000..ac705d103a8d8 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/CustomPageWithRankOutFormat.java @@ -0,0 +1,40 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom; + +import com.google.common.base.Charsets; + +import eu.stratosphere.pact.generic.io.FileOutputFormat; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types.VertexWithRankAndDangling; + +import java.io.IOException; + +public class CustomPageWithRankOutFormat extends FileOutputFormat { + + private final StringBuilder buffer = new StringBuilder(); + + @Override + public void writeRecord(VertexWithRankAndDangling record) throws IOException { + buffer.setLength(0); + buffer.append(record.getVertexID()); + buffer.append('\t'); + buffer.append(record.getRank()); + buffer.append('\n'); + + byte[] bytes = buffer.toString().getBytes(Charsets.UTF_8); + stream.write(bytes); + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyList.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyList.java similarity index 80% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyList.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyList.java index 5021676cc2ad6..1624793f62968 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyList.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyList.java @@ -12,38 +12,38 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; /** * */ -public final class NodeWithAdjacencyList { +public final class VertexWithAdjacencyList { private static final long[] EMPTY = new long[0]; - private long nodeId; + private long vertexID; private long[] targets; private int numTargets; - public NodeWithAdjacencyList() { + public VertexWithAdjacencyList() { this.targets = EMPTY; } - public NodeWithAdjacencyList(long nodeId, long[] targets) { - this.nodeId = nodeId; + public VertexWithAdjacencyList(long vertexID, long[] targets) { + this.vertexID = vertexID; this.targets = targets; } - public long getNodeId() { - return nodeId; + public long getVertexID() { + return vertexID; } - public void setNodeId(long nodeId) { - this.nodeId = nodeId; + public void setVertexID(long vertexID) { + this.vertexID = vertexID; } public long[] getTargets() { diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparator.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparator.java similarity index 78% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparator.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparator.java index ee5c6e4e70b40..8811d0cc5e824 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparator.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparator.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -23,29 +23,29 @@ /** * */ -public final class NodeWithRankAndDanglingComparator implements TypeComparator { +public final class VertexWithAdjacencyListComparator implements TypeComparator { private long reference; @Override - public int hash(NodeWithRankAndDangling record) { - final long value = record.getNodeId(); + public int hash(VertexWithAdjacencyList record) { + final long value = record.getVertexID(); return 43 + (int) (value ^ value >>> 32); } @Override - public void setReference(NodeWithRankAndDangling toCompare) { - this.reference = toCompare.getNodeId(); + public void setReference(VertexWithAdjacencyList toCompare) { + this.reference = toCompare.getVertexID(); } @Override - public boolean equalToReference(NodeWithRankAndDangling candidate) { - return candidate.getNodeId() == this.reference; + public boolean equalToReference(VertexWithAdjacencyList candidate) { + return candidate.getVertexID() == this.reference; } @Override - public int compareToReference(TypeComparator referencedComparator) { - NodeWithRankAndDanglingComparator comp = (NodeWithRankAndDanglingComparator) referencedComparator; + public int compareToReference(TypeComparator referencedComparator) { + VertexWithAdjacencyListComparator comp = (VertexWithAdjacencyListComparator) referencedComparator; final long diff = comp.reference - this.reference; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } @@ -72,8 +72,8 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) { } @Override - public void putNormalizedKey(NodeWithRankAndDangling record, byte[] target, int offset, int len) { - final long value = record.getNodeId(); + public void putNormalizedKey(VertexWithAdjacencyList record, byte[] target, int offset, int len) { + final long value = record.getVertexID(); if (len == 8) { // default case, full normalized key @@ -122,7 +122,7 @@ public boolean invertNormalizedKey() { } @Override - public NodeWithRankAndDanglingComparator duplicate() { - return new NodeWithRankAndDanglingComparator(); + public VertexWithAdjacencyListComparator duplicate() { + return new VertexWithAdjacencyListComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparatorFactory.java similarity index 83% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparatorFactory.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparatorFactory.java index 31bc55e91132d..dc8d7dc7fb523 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingComparatorFactory.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListComparatorFactory.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.generic.types.TypeComparatorFactory; @@ -20,7 +20,7 @@ /** * */ -public final class NodeWithRankAndDanglingComparatorFactory implements TypeComparatorFactory { +public final class VertexWithAdjacencyListComparatorFactory implements TypeComparatorFactory { @Override public void writeParametersToConfig(Configuration config) {} @@ -29,7 +29,7 @@ public void writeParametersToConfig(Configuration config) {} public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankAndDanglingComparator createComparator() { - return new NodeWithRankAndDanglingComparator(); + public VertexWithAdjacencyListComparator createComparator() { + return new VertexWithAdjacencyListComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializer.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializer.java similarity index 77% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializer.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializer.java index 5d552dcad609b..f42a3cec2084d 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializer.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializer.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -24,32 +24,32 @@ /** * */ -public final class NodeWithAdjacencyListSerializer implements TypeSerializer { +public final class VertexWithAdjacencyListSerializer implements TypeSerializer { @Override - public NodeWithAdjacencyList createInstance() { - return new NodeWithAdjacencyList(); + public VertexWithAdjacencyList createInstance() { + return new VertexWithAdjacencyList(); } @Override - public NodeWithAdjacencyList createCopy(NodeWithAdjacencyList from) { + public VertexWithAdjacencyList createCopy(VertexWithAdjacencyList from) { long[] targets = new long[from.getTargets().length]; System.arraycopy(from.getTargets(), 0, targets, 0, targets.length); - NodeWithAdjacencyList copy = new NodeWithAdjacencyList(); - copy.setNodeId(from.getNodeId()); + VertexWithAdjacencyList copy = new VertexWithAdjacencyList(); + copy.setVertexID(from.getVertexID()); copy.setNumTargets(from.getNumTargets()); copy.setTargets(targets); return copy; } @Override - public void copyTo(NodeWithAdjacencyList from, NodeWithAdjacencyList to) { + public void copyTo(VertexWithAdjacencyList from, VertexWithAdjacencyList to) { if (to.getTargets().length < from.getTargets().length) { to.setTargets(new long[from.getTargets().length]); } - to.setNodeId(from.getNodeId()); + to.setVertexID(from.getVertexID()); to.setNumTargets(from.getNumTargets()); System.arraycopy(from.getTargets(), 0, to.getTargets(), 0, from.getNumTargets()); } @@ -60,8 +60,8 @@ public int getLength() { } @Override - public void serialize(NodeWithAdjacencyList record, DataOutputView target) throws IOException { - target.writeLong(record.getNodeId()); + public void serialize(VertexWithAdjacencyList record, DataOutputView target) throws IOException { + target.writeLong(record.getVertexID()); final long[] targets = record.getTargets(); final int numTargets = record.getNumTargets(); @@ -73,8 +73,8 @@ public void serialize(NodeWithAdjacencyList record, DataOutputView target) throw } @Override - public void deserialize(NodeWithAdjacencyList target, DataInputView source) throws IOException { - target.setNodeId(source.readLong()); + public void deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException { + target.setVertexID(source.readLong()); final int numTargets = source.readInt(); long[] targets = target.getTargets(); diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializerFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializerFactory.java similarity index 76% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializerFactory.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializerFactory.java index cff5d8fb0662f..82001846a466e 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializerFactory.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithAdjacencyListSerializerFactory.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.generic.types.TypeSerializerFactory; @@ -20,9 +20,9 @@ /** * */ -public final class NodeWithRankAndDanglingSerializerFactory implements TypeSerializerFactory { +public final class VertexWithAdjacencyListSerializerFactory implements TypeSerializerFactory { - private static final NodeWithRankAndDanglingSerializer INSTANCE = new NodeWithRankAndDanglingSerializer(); + private static final VertexWithAdjacencyListSerializer INSTANCE = new VertexWithAdjacencyListSerializer(); @Override public void writeParametersToConfig(Configuration config) {} @@ -31,12 +31,12 @@ public void writeParametersToConfig(Configuration config) {} public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankAndDanglingSerializer getSerializer() { + public VertexWithAdjacencyListSerializer getSerializer() { return INSTANCE; } @Override - public Class getDataType() { - return NodeWithRankAndDangling.class; + public Class getDataType() { + return VertexWithAdjacencyList.class; } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRank.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRank.java similarity index 78% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRank.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRank.java index 5b2290cbaabb6..b568c09189a7b 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRank.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRank.java @@ -12,34 +12,34 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; /** * */ -public final class NodeWithRank { +public final class VertexWithRank { - private long nodeId; + private long vertexID; private double rank; - public NodeWithRank() { + public VertexWithRank() { } - public NodeWithRank(long nodeId, double rank) { - this.nodeId = nodeId; + public VertexWithRank(long vertexID, double rank) { + this.vertexID = vertexID; this.rank = rank; } - public long getNodeId() { - return nodeId; + public long getVertexID() { + return vertexID; } - public void setNodeId(long nodeId) { - this.nodeId = nodeId; + public void setVertexID(long vertexID) { + this.vertexID = vertexID; } public double getRank() { diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDangling.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDangling.java similarity index 78% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDangling.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDangling.java index 18d4a22d5da47..de7c261b5b14f 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDangling.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDangling.java @@ -12,37 +12,37 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; /** * */ -public final class NodeWithRankAndDangling { +public final class VertexWithRankAndDangling { - private long nodeId; + private long vertexID; private double rank; private boolean dangling; - public NodeWithRankAndDangling() { + public VertexWithRankAndDangling() { } - public NodeWithRankAndDangling(long nodeId, double rank, boolean dangling) { - this.nodeId = nodeId; + public VertexWithRankAndDangling(long vertexID, double rank, boolean dangling) { + this.vertexID = vertexID; this.rank = rank; this.dangling = dangling; } - public long getNodeId() { - return nodeId; + public long getVertexID() { + return vertexID; } - public void setNodeId(long nodeId) { - this.nodeId = nodeId; + public void setVertexID(long vertexID) { + this.vertexID = vertexID; } public double getRank() { diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparator.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparator.java similarity index 77% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparator.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparator.java index 3f7234825f82a..37706a2608dd9 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparator.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparator.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -23,29 +23,29 @@ /** * */ -public final class NodeWithAdjacencyListComparator implements TypeComparator { +public final class VertexWithRankAndDanglingComparator implements TypeComparator { private long reference; @Override - public int hash(NodeWithAdjacencyList record) { - final long value = record.getNodeId(); + public int hash(VertexWithRankAndDangling record) { + final long value = record.getVertexID(); return 43 + (int) (value ^ value >>> 32); } @Override - public void setReference(NodeWithAdjacencyList toCompare) { - this.reference = toCompare.getNodeId(); + public void setReference(VertexWithRankAndDangling toCompare) { + this.reference = toCompare.getVertexID(); } @Override - public boolean equalToReference(NodeWithAdjacencyList candidate) { - return candidate.getNodeId() == this.reference; + public boolean equalToReference(VertexWithRankAndDangling candidate) { + return candidate.getVertexID() == this.reference; } @Override - public int compareToReference(TypeComparator referencedComparator) { - NodeWithAdjacencyListComparator comp = (NodeWithAdjacencyListComparator) referencedComparator; + public int compareToReference(TypeComparator referencedComparator) { + VertexWithRankAndDanglingComparator comp = (VertexWithRankAndDanglingComparator) referencedComparator; final long diff = comp.reference - this.reference; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } @@ -72,8 +72,8 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) { } @Override - public void putNormalizedKey(NodeWithAdjacencyList record, byte[] target, int offset, int len) { - final long value = record.getNodeId(); + public void putNormalizedKey(VertexWithRankAndDangling record, byte[] target, int offset, int len) { + final long value = record.getVertexID(); if (len == 8) { // default case, full normalized key @@ -122,7 +122,7 @@ public boolean invertNormalizedKey() { } @Override - public NodeWithAdjacencyListComparator duplicate() { - return new NodeWithAdjacencyListComparator(); + public VertexWithRankAndDanglingComparator duplicate() { + return new VertexWithRankAndDanglingComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparatorFactory.java similarity index 82% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparatorFactory.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparatorFactory.java index 0aca5d450ddff..3dcfa400e873c 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListComparatorFactory.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingComparatorFactory.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.generic.types.TypeComparatorFactory; @@ -20,7 +20,7 @@ /** * */ -public final class NodeWithAdjacencyListComparatorFactory implements TypeComparatorFactory { +public final class VertexWithRankAndDanglingComparatorFactory implements TypeComparatorFactory { @Override public void writeParametersToConfig(Configuration config) {} @@ -29,7 +29,7 @@ public void writeParametersToConfig(Configuration config) {} public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithAdjacencyListComparator createComparator() { - return new NodeWithAdjacencyListComparator(); + public VertexWithRankAndDanglingComparator createComparator() { + return new VertexWithRankAndDanglingComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializer.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializer.java similarity index 69% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializer.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializer.java index 115ec662b3293..7f1f4407f1aa8 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankAndDanglingSerializer.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializer.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -24,16 +24,16 @@ /** * */ -public final class NodeWithRankAndDanglingSerializer implements TypeSerializer { +public final class VertexWithRankAndDanglingSerializer implements TypeSerializer { @Override - public NodeWithRankAndDangling createInstance() { - return new NodeWithRankAndDangling(); + public VertexWithRankAndDangling createInstance() { + return new VertexWithRankAndDangling(); } @Override - public NodeWithRankAndDangling createCopy(NodeWithRankAndDangling from) { - NodeWithRankAndDangling n = new NodeWithRankAndDangling(); + public VertexWithRankAndDangling createCopy(VertexWithRankAndDangling from) { + VertexWithRankAndDangling n = new VertexWithRankAndDangling(); copyTo(from, n); return n; } @@ -42,8 +42,8 @@ public NodeWithRankAndDangling createCopy(NodeWithRankAndDangling from) { * @see eu.stratosphere.pact.generic.types.TypeSerializer#copyTo(java.lang.Object, java.lang.Object) */ @Override - public void copyTo(NodeWithRankAndDangling from, NodeWithRankAndDangling to) { - to.setNodeId(from.getNodeId()); + public void copyTo(VertexWithRankAndDangling from, VertexWithRankAndDangling to) { + to.setVertexID(from.getVertexID()); to.setRank(from.getRank()); to.setDangling(from.isDangling()); } @@ -54,15 +54,15 @@ public int getLength() { } @Override - public void serialize(NodeWithRankAndDangling record, DataOutputView target) throws IOException { - target.writeLong(record.getNodeId()); + public void serialize(VertexWithRankAndDangling record, DataOutputView target) throws IOException { + target.writeLong(record.getVertexID()); target.writeDouble(record.getRank()); target.writeBoolean(record.isDangling()); } @Override - public void deserialize(NodeWithRankAndDangling target, DataInputView source) throws IOException { - target.setNodeId(source.readLong()); + public void deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException { + target.setVertexID(source.readLong()); target.setRank(source.readDouble()); target.setDangling(source.readBoolean()); } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializerFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializerFactory.java similarity index 75% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializerFactory.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializerFactory.java index e86839c6b9a83..34bd9eb80b5d5 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithAdjacencyListSerializerFactory.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankAndDanglingSerializerFactory.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.generic.types.TypeSerializerFactory; @@ -20,9 +20,9 @@ /** * */ -public final class NodeWithAdjacencyListSerializerFactory implements TypeSerializerFactory { +public final class VertexWithRankAndDanglingSerializerFactory implements TypeSerializerFactory { - private static final NodeWithAdjacencyListSerializer INSTANCE = new NodeWithAdjacencyListSerializer(); + private static final VertexWithRankAndDanglingSerializer INSTANCE = new VertexWithRankAndDanglingSerializer(); @Override public void writeParametersToConfig(Configuration config) {} @@ -31,12 +31,12 @@ public void writeParametersToConfig(Configuration config) {} public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithAdjacencyListSerializer getSerializer() { + public VertexWithRankAndDanglingSerializer getSerializer() { return INSTANCE; } @Override - public Class getDataType() { - return NodeWithAdjacencyList.class; + public Class getDataType() { + return VertexWithRankAndDangling.class; } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparator.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparator.java similarity index 79% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparator.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparator.java index 2956eb963709c..a92f5cce92454 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparator.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparator.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -23,29 +23,29 @@ /** * */ -public final class NodeWithRankComparator implements TypeComparator { +public final class VertexWithRankComparator implements TypeComparator { private long reference; @Override - public int hash(NodeWithRank record) { - final long value = record.getNodeId(); + public int hash(VertexWithRank record) { + final long value = record.getVertexID(); return 43 + (int) (value ^ value >>> 32); } @Override - public void setReference(NodeWithRank toCompare) { - this.reference = toCompare.getNodeId(); + public void setReference(VertexWithRank toCompare) { + this.reference = toCompare.getVertexID(); } @Override - public boolean equalToReference(NodeWithRank candidate) { - return candidate.getNodeId() == this.reference; + public boolean equalToReference(VertexWithRank candidate) { + return candidate.getVertexID() == this.reference; } @Override - public int compareToReference(TypeComparator referencedComparator) { - NodeWithRankComparator comp = (NodeWithRankComparator) referencedComparator; + public int compareToReference(TypeComparator referencedComparator) { + VertexWithRankComparator comp = (VertexWithRankComparator) referencedComparator; final long diff = comp.reference - this.reference; return diff < 0 ? -1 : diff > 0 ? 1 : 0; } @@ -72,8 +72,8 @@ public boolean isNormalizedKeyPrefixOnly(int keyBytes) { } @Override - public void putNormalizedKey(NodeWithRank record, byte[] target, int offset, int len) { - final long value = record.getNodeId(); + public void putNormalizedKey(VertexWithRank record, byte[] target, int offset, int len) { + final long value = record.getVertexID(); if (len == 8) { // default case, full normalized key @@ -122,7 +122,7 @@ public boolean invertNormalizedKey() { } @Override - public NodeWithRankComparator duplicate() { - return new NodeWithRankComparator(); + public VertexWithRankComparator duplicate() { + return new VertexWithRankComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparatorFactory.java similarity index 85% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparatorFactory.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparatorFactory.java index e91a0b65f1401..0021e14ab32a9 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankComparatorFactory.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankComparatorFactory.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.generic.types.TypeComparatorFactory; @@ -20,7 +20,7 @@ /** * */ -public final class NodeWithRankComparatorFactory implements TypeComparatorFactory { +public final class VertexWithRankComparatorFactory implements TypeComparatorFactory { @Override public void writeParametersToConfig(Configuration config) {} @@ -29,7 +29,7 @@ public void writeParametersToConfig(Configuration config) {} public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankComparator createComparator() { - return new NodeWithRankComparator(); + public VertexWithRankComparator createComparator() { + return new VertexWithRankComparator(); } } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java new file mode 100644 index 0000000000000..60f457d061dd4 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory.java @@ -0,0 +1,87 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; + +import eu.stratosphere.pact.generic.types.TypeComparator; +import eu.stratosphere.pact.generic.types.TypePairComparator; +import eu.stratosphere.pact.generic.types.TypePairComparatorFactory; + + +/** + * + */ +public class VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory + implements TypePairComparatorFactory +{ + + @Override + public VertexWithRankDanglingToVertexWithAdjacencyListPairComparator createComparator12( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithRankDanglingToVertexWithAdjacencyListPairComparator(); + } + + @Override + public VertexWithAdjacencyListToVertexWithRankDanglingPairComparator createComparator21( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithAdjacencyListToVertexWithRankDanglingPairComparator(); + } + + + public static final class VertexWithRankDanglingToVertexWithAdjacencyListPairComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithRankAndDangling reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithAdjacencyList candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithAdjacencyList candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } + + public static final class VertexWithAdjacencyListToVertexWithRankDanglingPairComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithAdjacencyList reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithRankAndDangling candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithRankAndDangling candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java new file mode 100644 index 0000000000000..7563d304bf008 --- /dev/null +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankDanglingToVertexWithRankPairComparatorFactory.java @@ -0,0 +1,87 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; + +import eu.stratosphere.pact.generic.types.TypeComparator; +import eu.stratosphere.pact.generic.types.TypePairComparator; +import eu.stratosphere.pact.generic.types.TypePairComparatorFactory; + + +/** + * + */ +public class VertexWithRankDanglingToVertexWithRankPairComparatorFactory + implements TypePairComparatorFactory +{ + + @Override + public VertexWithRankDanglingToVertexWithRankComparator createComparator12( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithRankDanglingToVertexWithRankComparator(); + } + + @Override + public VertexWithRankToVertexWithRankDanglingPairComparator createComparator21( + TypeComparator comparator1, TypeComparator comparator2) + { + return new VertexWithRankToVertexWithRankDanglingPairComparator(); + } + + + public static final class VertexWithRankDanglingToVertexWithRankComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithRankAndDangling reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithRank candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithRank candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } + + public static final class VertexWithRankToVertexWithRankDanglingPairComparator + implements TypePairComparator + { + private long reference; + + @Override + public void setReference(VertexWithRank reference) { + this.reference = reference.getVertexID(); + } + + @Override + public boolean equalToReference(VertexWithRankAndDangling candidate) { + return this.reference == candidate.getVertexID(); + } + + @Override + public int compareToReference(VertexWithRankAndDangling candidate) { + long diff = candidate.getVertexID() - this.reference; + return diff < 0 ? -1 : diff > 0 ? 1 : 0; + } + } +} diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializer.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializer.java similarity index 72% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializer.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializer.java index fc62f2eb99467..3bb86636c4a21 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializer.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializer.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import java.io.IOException; @@ -24,16 +24,16 @@ /** * */ -public final class NodeWithRankSerializer implements TypeSerializer { +public final class VertexWithRankSerializer implements TypeSerializer { @Override - public NodeWithRank createInstance() { - return new NodeWithRank(); + public VertexWithRank createInstance() { + return new VertexWithRank(); } @Override - public NodeWithRank createCopy(NodeWithRank from) { - NodeWithRank n = new NodeWithRank(); + public VertexWithRank createCopy(VertexWithRank from) { + VertexWithRank n = new VertexWithRank(); copyTo(from, n); return n; } @@ -42,8 +42,8 @@ public NodeWithRank createCopy(NodeWithRank from) { * @see eu.stratosphere.pact.generic.types.TypeSerializer#copyTo(java.lang.Object, java.lang.Object) */ @Override - public void copyTo(NodeWithRank from, NodeWithRank to) { - to.setNodeId(from.getNodeId()); + public void copyTo(VertexWithRank from, VertexWithRank to) { + to.setVertexID(from.getVertexID()); to.setRank(from.getRank()); } @@ -53,14 +53,14 @@ public int getLength() { } @Override - public void serialize(NodeWithRank record, DataOutputView target) throws IOException { - target.writeLong(record.getNodeId()); + public void serialize(VertexWithRank record, DataOutputView target) throws IOException { + target.writeLong(record.getVertexID()); target.writeDouble(record.getRank()); } @Override - public void deserialize(NodeWithRank target, DataInputView source) throws IOException { - target.setNodeId(source.readLong()); + public void deserialize(VertexWithRank target, DataInputView source) throws IOException { + target.setVertexID(source.readLong()); target.setRank(source.readDouble()); } diff --git a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializerFactory.java b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializerFactory.java similarity index 79% rename from pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializerFactory.java rename to pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializerFactory.java index e289881b6e69c..f138097035c6f 100644 --- a/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/types/NodeWithRankSerializerFactory.java +++ b/pact/pact-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/compensatable/danglingpagerank/custom/types/VertexWithRankSerializerFactory.java @@ -12,7 +12,7 @@ * specific language governing permissions and limitations under the License. * **********************************************************************************************************************/ -package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.types; +package eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.types; import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.pact.generic.types.TypeSerializerFactory; @@ -20,9 +20,9 @@ /** * */ -public final class NodeWithRankSerializerFactory implements TypeSerializerFactory { +public final class VertexWithRankSerializerFactory implements TypeSerializerFactory { - private static final NodeWithRankSerializer INSTANCE = new NodeWithRankSerializer(); + private static final VertexWithRankSerializer INSTANCE = new VertexWithRankSerializer(); @Override public void writeParametersToConfig(Configuration config) {} @@ -31,12 +31,12 @@ public void writeParametersToConfig(Configuration config) {} public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {} @Override - public NodeWithRankSerializer getSerializer() { + public VertexWithRankSerializer getSerializer() { return INSTANCE; } @Override - public Class getDataType() { - return NodeWithRank.class; + public Class getDataType() { + return VertexWithRank.class; } } diff --git a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java index a466d52f04ae5..5de5b2db6d4b0 100644 --- a/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java +++ b/pact/pact-tests/src/test/java/eu/stratosphere/pact/test/iterative/DanglingPageRankITCase.java @@ -24,6 +24,7 @@ import eu.stratosphere.nephele.configuration.Configuration; import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.pact.runtime.iterative.compensatable.danglingpagerank.custom.CustomCompensatableDanglingPageRank; import eu.stratosphere.pact.test.util.TestBase; @RunWith(Parameterized.class) @@ -102,7 +103,7 @@ protected JobGraph getJobGraph() throws Exception { "0" }; - return CompensatableDanglingPageRank.getJobGraph(parameters); + return CustomCompensatableDanglingPageRank.getJobGraph(parameters); }