Skip to content

Commit

Permalink
convergence checking via sync is now working
Browse files Browse the repository at this point in the history
  • Loading branch information
asubmissions committed Aug 7, 2012
1 parent 7272531 commit 4189b70
Show file tree
Hide file tree
Showing 16 changed files with 467 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.runtime.iterative.convergence;

import eu.stratosphere.pact.common.type.PactRecord;

public interface ConvergenceCriterion {

void prepareForNextIteration();

void analyze(PactRecord record);

boolean isConverged();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,42 @@

package eu.stratosphere.pact.runtime.iterative.io;

import eu.stratosphere.nephele.io.AbstractRecordWriter;
import eu.stratosphere.nephele.services.memorymanager.DataOutputView;
import eu.stratosphere.nephele.types.Record;
import eu.stratosphere.pact.common.generic.types.TypeSerializer;
import eu.stratosphere.pact.common.stubs.Collector;

import java.io.IOException;
import java.util.List;

/** {@link Collector} to write to a {@link DataOutputView} */
public class DataOutputCollector<T> implements Collector<T> {
public class DataOutputCollector<T extends Record> implements Collector<T> {

/** {@link DataOutputView} to write to */
private final DataOutputView outputView;
/** serializer to use */
private final TypeSerializer<T> typeSerializer;
private final List<AbstractRecordWriter<T>> writers;

public DataOutputCollector(DataOutputView outputView, TypeSerializer<T> typeSerializer) {
public DataOutputCollector(DataOutputView outputView, TypeSerializer<T> typeSerializer, List<AbstractRecordWriter<T>> writers) {
this.outputView = outputView;
this.typeSerializer = typeSerializer;
this.writers = writers;
}

@Override
public void collect(T record) {
try {
typeSerializer.serialize(record, outputView);
for (AbstractRecordWriter<T> writer : writers) {
writer.emit(record);
}
}
catch (IOException e) {
throw new RuntimeException("Unable to serialize the record", e);
} catch (InterruptedException e) {
throw new RuntimeException("Unable to serialize the record", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ public class InterruptingMutableObjectIterator<E> implements MutableObjectIterat
private final AtomicInteger endOfSuperstepEventCounter;
private final AtomicInteger terminationEventCounter;
private final Terminable owningIterativeTask;
private final int gateIndex;

private static final Log log = LogFactory.getLog(InterruptingMutableObjectIterator.class);

public InterruptingMutableObjectIterator(MutableObjectIterator<E> delegate, int numberOfEventsUntilInterrupt,
String name, Terminable owningIterativeTask) {
String name, Terminable owningIterativeTask, int gateIndex) {
Preconditions.checkArgument(numberOfEventsUntilInterrupt > 0);
this.delegate = delegate;
this.numberOfEventsUntilInterrupt = numberOfEventsUntilInterrupt;
this.name = name;
this.owningIterativeTask = owningIterativeTask;
this.gateIndex = gateIndex;

endOfSuperstepEventCounter = new AtomicInteger(0);
terminationEventCounter = new AtomicInteger(0);
Expand All @@ -75,7 +77,8 @@ public void eventOccurred(AbstractTaskEvent event) {
private void onTermination() {
int numberOfEventsSeen = terminationEventCounter.incrementAndGet();
if (log.isInfoEnabled()) {
log.info("InterruptibleIterator of " + name + " received Termination event (" + numberOfEventsSeen +")");
log.info("InterruptibleIterator of " + name + " on gate [" + gateIndex +"] received Termination event (" +
numberOfEventsSeen +")");
}

Preconditions.checkState(numberOfEventsSeen <= numberOfEventsUntilInterrupt);
Expand All @@ -88,7 +91,8 @@ private void onTermination() {
private void onEndOfSuperstep() {
int numberOfEventsSeen = endOfSuperstepEventCounter.incrementAndGet();
if (log.isInfoEnabled()) {
log.info("InterruptibleIterator of " + name + " received EndOfSuperstep event (" + numberOfEventsSeen +")");
log.info("InterruptibleIterator of " + name + " on gate [" + gateIndex + "] received EndOfSuperstep event (" +
numberOfEventsSeen +")");
}

if (numberOfEventsSeen % numberOfEventsUntilInterrupt == 0) {
Expand All @@ -111,7 +115,7 @@ public boolean next(E target) throws IOException {
// } else {

if (!recordFound && log.isInfoEnabled()) {
log.info("InterruptibleIterator of " + name + " releases input");
log.info("InterruptibleIterator of " + name + " on gate [" + gateIndex + "] releases input");
}
return recordFound;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.runtime.iterative.playing.pagerank;

import eu.stratosphere.pact.common.stubs.Collector;
import eu.stratosphere.pact.common.stubs.MatchStub;
import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactDouble;
import eu.stratosphere.pact.common.type.base.PactLong;

public class DiffPerVertexMatch extends MatchStub {

@Override
public void match(PactRecord pageWithPreviousRank, PactRecord pageWithCurrentRank, Collector<PactRecord> out)
throws Exception {
long vertex = pageWithPreviousRank.getField(0, PactLong.class).getValue();

double previousPageRank = pageWithPreviousRank.getField(1, PactDouble.class).getValue();
double currentPageRank = pageWithCurrentRank.getField(1, PactDouble.class).getValue();

double diff = Math.abs(currentPageRank - previousPageRank);
//System.out.println(vertex + " " + previousPageRank + " --> " + currentPageRank + " (" + diff + ")");

PactRecord result = new PactRecord();
result.setField(0, new PactLong(vertex));
result.setField(1, new PactDouble(diff));

out.collect(result);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ public void match(PactRecord pageWithRank, PactRecord transitionMatrixEntry, Col
record.setField(0, new PactLong(vertexID));
record.setField(1, new PactDouble(rank * transitionProbability));

long source = transitionMatrixEntry.getField(0, PactLong.class).getValue();
System.out.println("Match from " + source + " to " + vertexID + ": " + rank + " * " + transitionProbability + " = " + (rank * transitionProbability));
//long source = transitionMatrixEntry.getField(0, PactLong.class).getValue();
//System.out.println("Match from " + source + " to " + vertexID + ": " + rank + " * " + transitionProbability + " = " + (rank * transitionProbability));

collector.collect(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ public void reduce(Iterator<PactRecord> records, Collector<PactRecord> collector
PactRecord record = records.next();
accumulator.setField(0, record.getField(0, PactLong.class));
sum += record.getField(1, PactDouble.class).getValue();
System.out.println("\t" + record.getField(0, PactLong.class) + " " + record.getField(1, PactDouble.class));
//System.out.println("\t" + record.getField(0, PactLong.class) + " " + record.getField(1, PactDouble.class));
}

accumulator.setField(1, new PactDouble(sum));

System.out.println("Reduce: " + accumulator.getField(0, PactLong.class) + " " + sum);
//System.out.println("Reduce: " + accumulator.getField(0, PactLong.class) + " " + sum);

collector.collect(accumulator);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/***********************************************************************************************************************
*
* Copyright (C) 2012 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/

package eu.stratosphere.pact.runtime.iterative.playing.pagerank;

import eu.stratosphere.pact.common.type.PactRecord;
import eu.stratosphere.pact.common.type.base.PactDouble;
import eu.stratosphere.pact.runtime.iterative.convergence.ConvergenceCriterion;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class L1NormConvergenceCriterion implements ConvergenceCriterion {

private double sum;
private double numRecordsSeen;

private static final double EPSILON = 0.0001;

private static final Log log = LogFactory.getLog(L1NormConvergenceCriterion.class);

@Override
public void prepareForNextIteration() {
sum = 0;
numRecordsSeen = 0;
}

@Override
public void analyze(PactRecord record) {
double diff = record.getField(1, PactDouble.class).getValue();

sum += diff;
numRecordsSeen++;
}

@Override
public boolean isConverged() {
double error = sum / numRecordsSeen;

if (log.isInfoEnabled()) {
log.info("L1 norm of the current vector difference: " + error);
}

return error < EPSILON;
}
}
Loading

0 comments on commit 4189b70

Please sign in to comment.