Skip to content

Commit

Permalink
fixed PageRank implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
asubmissions committed Oct 8, 2012
1 parent 5d5bb5a commit 1e5b9cd
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,23 @@ record = new PactRecord();
public void match(PactRecord pageWithRank, PactRecord transitionMatrixEntry, Collector<PactRecord> collector)
throws Exception {

//System.out.println("fields ###### " + pageWithRank.getNumFields() + " " + transitionMatrixEntry.getNumFields());
//System.out.println("field0 ###### " + pageWithRank.getField(0, PactLong.class).getValue() + " " + transitionMatrixEntry.getField(0, PactLong.class).getValue());
//System.out.println("field1 ###### " + pageWithRank.getField(1, PactDouble.class).getValue() + " " + transitionMatrixEntry.getField(1, PactDouble.class).getValue());
// System.out.println("fields ###### " + pageWithRank.getNumFields() + " " + transitionMatrixEntry.getNumFields());
// System.out.println("field0 ###### " + pageWithRank.getField(0, PactLong.class).getValue() + " " + transitionMatrixEntry.getField(0, PactLong.class).getValue());
// System.out.println("field1 ###### " + pageWithRank.getField(1, PactDouble.class).getValue() + " " + transitionMatrixEntry.getField(1, PactDouble.class).getValue());

//long source = transitionMatrixEntry.getField(0, PactLong.class).getValue();
//long target = transitionMatrixEntry.getField(1, PactLong.class).getValue();
//long vertexID = pageWithRank.getField(0, PactLong.class).getValue();
long source = transitionMatrixEntry.getField(0, PactLong.class).getValue();
long target = transitionMatrixEntry.getField(1, PactLong.class).getValue();
long vertexID = pageWithRank.getField(0, PactLong.class).getValue();

double rank = pageWithRank.getField(1, PactDouble.class).getValue();
double transitionProbability = transitionMatrixEntry.getField(2, PactDouble.class).getValue();

record.setField(0, transitionMatrixEntry.getField(1, PactLong.class));
record.setField(1, new PactDouble(rank * transitionProbability));

//System.out.println("Joining (" + vertexID + "," + rank + ") with (" + source + "," + target + "," + transitionProbability + ")");
// System.out.println("Joining (" + vertexID + "," + rank + ") with (" + source + "," + target + "," + transitionProbability + ")");
// System.out.println(">>>>>>>>>>>> Emitting: " + target + "," + (rank * transitionProbability));


collector.collect(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,26 @@ public void open(Configuration parameters) throws Exception {
@Override
public void reduce(Iterator<PactRecord> records, Collector<PactRecord> collector) throws Exception {

records.hasNext();
PactRecord first = records.next();

//StringBuilder buffer = new StringBuilder();
//buffer.append("\t" + first.getField(0, PactLong.class) + " " + first.getField(1, PactDouble.class) + "\n");
// StringBuilder buffer = new StringBuilder();
// buffer.append("(((");
// buffer.append("\t" + first.getField(0, PactLong.class) + " " + first.getField(1, PactDouble.class) + "\n");

accumulator.setField(0, first.getField(0, PactLong.class));
double sum = first.getField(1, PactDouble.class).getValue();

while (records.hasNext()) {
PactRecord record = records.next();
sum += record.getField(1, PactDouble.class).getValue();
//buffer.append("\t" + record.getField(0, PactLong.class) + " " + record.getField(1, PactDouble.class) + "\n");
// buffer.append("\t" + record.getField(0, PactLong.class) + " " + record.getField(1, PactDouble.class) + "\n");
}

accumulator.setField(1, new PactDouble(sum));

//buffer.append("Reduce: " + accumulator.getField(0, PactLong.class) + " " + sum);
//System.out.println(buffer);
// buffer.append("= " + accumulator.getField(0, PactLong.class) + " " + sum + ")))");
// System.out.println(buffer);

collector.collect(accumulator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import eu.stratosphere.pact.common.type.base.PactLong;
import eu.stratosphere.pact.runtime.iterative.playing.JobGraphUtils;
import eu.stratosphere.pact.runtime.iterative.playing.PlayConstants;
import eu.stratosphere.pact.runtime.iterative.playing.Utils;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
Expand Down Expand Up @@ -66,15 +65,19 @@ public static void main(String[] args) throws Exception {

JobInputVertex pageWithRankInput = JobGraphUtils.createInput(PageWithRankInputFormat.class, pageWithRankInputPath,
"PageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(pageWithRankInputConfig.getConfigForOutputParameters(0),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });

JobInputVertex transitionMatrixInput = JobGraphUtils.createInput(TransitionMatrixInputFormat.class,
transitionMatrixInputPath, "TransitionMatrixInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig transitionMatrixInputConfig = new TaskConfig(transitionMatrixInput.getConfiguration());
transitionMatrixInputConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(transitionMatrixInputConfig.getConfigForOutputParameters(0),
new int[] { 1 }, new Class[] { PactLong.class }, new boolean[] { true });
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });

JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "BulkIterationHead", jobGraph,
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
degreeOfParallelism, numSubTasksPerInstance);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setDriver(MapDriver.class);
Expand All @@ -83,24 +86,23 @@ public static void main(String[] args) throws Exception {
headConfig.setBackChannelMemoryFraction(0.8f);

JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
"BulkIterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
"IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
//intermediateConfig.setDriver(MatchDriver.class);
intermediateConfig.setDriver(RepeatableHashJoinMatchDriver.class);
intermediateConfig.setStubClass(DotProductMatch.class);
//intermediateConfig.setLocalStrategy(TaskConfig.LocalStrategy.HYBRIDHASH_FIRST);
PactRecordComparatorFactory.writeComparatorSetupToConfig(intermediateConfig.getConfigForInputParameters(0),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });
PactRecordComparatorFactory.writeComparatorSetupToConfig(intermediateConfig.getConfigForInputParameters(1),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });
intermediateConfig.setMemorySize(memoryPerTask * JobGraphUtils.MEGABYTE);
//intermediateConfig.setGateCached(1);
//intermediateConfig.setInputGateCacheMemoryFraction(0.5f);
intermediateConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(intermediateConfig.getConfigForOutputParameters(0),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });

JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "BulkIterationTail", jobGraph,
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
degreeOfParallelism, numSubTasksPerInstance);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setLocalStrategy(TaskConfig.LocalStrategy.SORT);
tailConfig.setLocalStrategy(TaskConfig.LocalStrategy.COMBININGSORT);
tailConfig.setDriver(ReduceDriver.class);
tailConfig.setStubClass(DotProductReducer.class);
PactRecordComparatorFactory.writeComparatorSetupToConfig(tailConfig.getConfigForInputParameters(0), new int[] { 0 },
Expand All @@ -122,8 +124,8 @@ public static void main(String[] args) throws Exception {
numSubTasksPerInstance);

//TODO implicit order should be documented/configured somehow
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.INMEMORY, DistributionPattern.POINTWISE,
ShipStrategyType.FORWARD);
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategyType.PARTITION_HASH);
JobGraphUtils.connect(head, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategyType.BROADCAST);
JobGraphUtils.connect(transitionMatrixInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
Expand All @@ -137,9 +139,9 @@ public static void main(String[] args) throws Exception {
JobGraphUtils.connect(tail, fakeTailOutput, ChannelType.INMEMORY, DistributionPattern.POINTWISE,
ShipStrategyType.FORWARD);

JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.POINTWISE,
ShipStrategyType.FORWARD);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategyType.PARTITION_HASH);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);

fakeTailOutput.setVertexToShareInstancesWith(tail);
tail.setVertexToShareInstancesWith(head);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.iterative.task.RepeatableHashJoinMatchDriver;
import eu.stratosphere.pact.runtime.plugable.PactRecordComparatorFactory;
import eu.stratosphere.pact.runtime.shipping.ShipStrategy;
import eu.stratosphere.pact.runtime.task.MapDriver;
Expand All @@ -48,7 +49,7 @@ public static void main(String[] args) throws Exception {
String transitionMatrixInputPath = "file://" + PlayConstants.PLAY_DIR + "test-inputs/pagerank/transitionMatrix";
String outputPath = "file:///tmp/stratosphere/iterations";
String confPath = PlayConstants.PLAY_DIR + "local-conf";
int memoryPerTask = 100;
int memoryPerTask = 25;

if (args.length == 7) {
degreeOfParallelism = Integer.parseInt(args[0]);
Expand All @@ -64,6 +65,10 @@ public static void main(String[] args) throws Exception {

JobInputVertex pageWithRankInput = JobGraphUtils.createInput(PageWithRankInputFormat.class, pageWithRankInputPath,
"PageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
pageWithRankInputConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(pageWithRankInputConfig.getConfigForOutputParameters(0),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });

JobInputVertex transitionMatrixInput = JobGraphUtils.createInput(TransitionMatrixInputFormat.class,
transitionMatrixInputPath, "TransitionMatrixInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
Expand All @@ -72,7 +77,7 @@ public static void main(String[] args) throws Exception {
PactRecordComparatorFactory.writeComparatorSetupToConfig(transitionMatrixInputConfig.getConfigForOutputParameters(0),
new int[] { 1 }, new Class[] { PactLong.class }, new boolean[] { true });

JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "BulkIterationHead", jobGraph,
JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph,
degreeOfParallelism, numSubTasksPerInstance);
TaskConfig headConfig = new TaskConfig(head.getConfiguration());
headConfig.setDriver(MapDriver.class);
Expand All @@ -81,18 +86,19 @@ public static void main(String[] args) throws Exception {
headConfig.setBackChannelMemoryFraction(0.8f);

JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
"BulkIterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
"IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
intermediateConfig.setDriver(MatchDriver.class);
intermediateConfig.setDriver(RepeatableHashJoinMatchDriver.class);
intermediateConfig.setStubClass(DotProductMatch.class);
intermediateConfig.setLocalStrategy(TaskConfig.LocalStrategy.HYBRIDHASH_FIRST);
PactRecordComparatorFactory.writeComparatorSetupToConfig(intermediateConfig.getConfigForInputParameters(0),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });
PactRecordComparatorFactory.writeComparatorSetupToConfig(intermediateConfig.getConfigForInputParameters(1),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });
intermediateConfig.setMemorySize(memoryPerTask * JobGraphUtils.MEGABYTE);
intermediateConfig.setGateCached(1);
intermediateConfig.setInputGateCacheMemoryFraction(0.5f);
intermediateConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(intermediateConfig.getConfigForOutputParameters(0),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });

JobTaskVertex diffPerVertex = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "DiffPerVertex",
jobGraph, degreeOfParallelism, numSubTasksPerInstance);
Expand All @@ -101,15 +107,15 @@ public static void main(String[] args) throws Exception {
diffPerVertexConfig.setStubClass(DiffPerVertexMatch.class);
diffPerVertexConfig.setLocalStrategy(TaskConfig.LocalStrategy.HYBRIDHASH_FIRST);
PactRecordComparatorFactory.writeComparatorSetupToConfig(diffPerVertexConfig.getConfigForInputParameters(0),
new int[]{ 0 }, new Class[] { PactLong.class }, new boolean[] { true });
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });
PactRecordComparatorFactory.writeComparatorSetupToConfig(diffPerVertexConfig.getConfigForInputParameters(1),
new int[] { 0 }, new Class[] { PactLong.class }, new boolean[] { true });
diffPerVertexConfig.setMemorySize(20 * JobGraphUtils.MEGABYTE);
diffPerVertexConfig.setComparatorFactoryForOutput(PactRecordComparatorFactory.class, 0);
PactRecordComparatorFactory.writeComparatorSetupToConfig(diffPerVertexConfig.getConfigForOutputParameters(0),
new int[] { 0 }, new Class[] { PactNull.class }, new boolean[] { true });

JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "BulkIterationTail", jobGraph,
JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph,
degreeOfParallelism, numSubTasksPerInstance);
TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
tailConfig.setLocalStrategy(TaskConfig.LocalStrategy.SORT);
Expand All @@ -135,17 +141,17 @@ public static void main(String[] args) throws Exception {
numSubTasksPerInstance);

//TODO implicit order should be documented/configured somehow
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.INMEMORY, DistributionPattern.POINTWISE,
ShipStrategy.ShipStrategyType.FORWARD);
JobGraphUtils.connect(pageWithRankInput, head, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategy.ShipStrategyType.PARTITION_HASH);
JobGraphUtils.connect(head, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategy.ShipStrategyType.BROADCAST);
JobGraphUtils.connect(transitionMatrixInput, intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategy.ShipStrategyType.PARTITION_HASH);
intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);

JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.POINTWISE,
ShipStrategy.ShipStrategyType.FORWARD);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
JobGraphUtils.connect(intermediate, tail, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategy.ShipStrategyType.PARTITION_HASH);
tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, degreeOfParallelism);

JobGraphUtils.connect(head, diffPerVertex, ChannelType.NETWORK, DistributionPattern.BIPARTITE,
ShipStrategy.ShipStrategyType.BROADCAST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import eu.stratosphere.pact.common.generic.types.TypePairComparatorFactory;
import eu.stratosphere.pact.common.generic.types.TypeSerializer;
import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactLong;
import eu.stratosphere.pact.common.util.InstantiationUtil;
import eu.stratosphere.pact.common.util.MutableObjectIterator;
import eu.stratosphere.pact.runtime.hash.MutableHashTable;
Expand Down Expand Up @@ -115,7 +113,7 @@ public void prepare() throws Exception {

@Override
public void run() throws Exception {

//TODO configure build and probeside index
final GenericMatcher<IT1, IT2, OT> matchStub = taskContext.getStub();
//TODO type safety
final Collector<OT> collector = taskContext.getOutputCollector();
Expand All @@ -132,7 +130,6 @@ public void run() throws Exception {
final IT2 buildSideRecord = taskContext.<IT2>getInputSerializer(1).createInstance();

while (running && probeSide.next(probeSideRecord)) {
//System.out.println("#######################PROBING FOR " + ((PactRecord) probeSideRecord).getField(0, PactLong.class).getValue());
MutableHashTable.HashBucketIterator<IT2, IT1> bucket = hashJoin.getMatchesFor(probeSideRecord);
while (bucket.next(buildSideRecord)) {
matchStub.match(probeSideRecord, buildSideRecord, collector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public boolean nextKey() throws IOException
// try to move to next key.
// Required if user code / reduce() method did not read the whole value iterator.
while (true) {
if (this.iterator.next(this.current)) {
if (!this.done && this.iterator.next(this.current)) {
if (!this.comparator.equalToReference(this.current)) {
// the keys do not match, so we have a new group. store the current keys
this.comparator.setReference(this.current);
this.comparator.setReference(this.current);
this.lookAheadHasNext = false;
this.valuesIterator.currentIsUnconsumed = true;
return true;
Expand Down

0 comments on commit 1e5b9cd

Please sign in to comment.