Skip to content

Commit

Permalink
Adds PageRankBasic Scala example. Removes old Scala examples
Browse files Browse the repository at this point in the history
  • Loading branch information
fhueske authored and aljoscha committed Sep 22, 2014
1 parent 81e81b9 commit 0dc7614
Show file tree
Hide file tree
Showing 7 changed files with 240 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// get input data
DataSet<Tuple1<Long>> pagesInput = getPagesDataSet(env);
DataSet<Long> pagesInput = getPagesDataSet(env);
DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);

// assign initial rank to pages
Expand Down Expand Up @@ -138,16 +138,16 @@ public static void main(String[] args) throws Exception {
/**
* A map function that assigns an initial rank to all pages.
*/
public static final class RankAssigner implements MapFunction<Tuple1<Long>, Tuple2<Long, Double>> {
public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
Tuple2<Long, Double> outPageWithRank;

public RankAssigner(double rank) {
this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank);
}

@Override
public Tuple2<Long, Double> map(Tuple1<Long> page) {
outPageWithRank.f0 = page.f0;
public Tuple2<Long, Double> map(Long page) {
outPageWithRank.f0 = page;
return outPageWithRank;
}
}
Expand Down Expand Up @@ -259,12 +259,17 @@ private static boolean parseParameters(String[] args) {
return true;
}

private static DataSet<Tuple1<Long>> getPagesDataSet(ExecutionEnvironment env) {
private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(pagesInputPath)
.fieldDelimiter(' ')
.lineDelimiter("\n")
.types(Long.class);
return env
.readCsvFile(pagesInputPath)
.fieldDelimiter(' ')
.lineDelimiter("\n")
.types(Long.class)
.map(new MapFunction<Tuple1<Long>, Long>() {
@Override
public Long map(Tuple1<Long> v) { return v.f0; }
});
} else {
return PageRankData.getDefaultPagesDataSet(env);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;

/**
* Provides the default data sets used for the PageRank example program.
Expand All @@ -35,52 +33,51 @@
*/
public class PageRankData {

public static final Object[][] EDGES = {
{1L, 2L},
{1L, 15L},
{2L, 3L},
{2L, 4L},
{2L, 5L},
{2L, 6L},
{2L, 7L},
{3L, 13L},
{4L, 2L},
{5L, 11L},
{5L, 12L},
{6L, 1L},
{6L, 7L},
{6L, 8L},
{7L, 1L},
{7L, 8L},
{8L, 1L},
{8L, 9L},
{8L, 10L},
{9L, 14L},
{9L, 1L},
{10L, 1L},
{10L, 13L},
{11L, 12L},
{11L, 1L},
{12L, 1L},
{13L, 14L},
{14L, 12L},
{15L, 1L},
};

private static long numPages = 15;

public static DataSet<Tuple1<Long>> getDefaultPagesDataSet(ExecutionEnvironment env) {

List<Tuple1<Long>> data = new ArrayList<Tuple1<Long>>();
public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

for(long i=0; i<numPages; i++) {
data.add(new Tuple1<Long>(i));
List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
for(Object[] e : EDGES) {
edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
}
return env.fromCollection(data);
return env.fromCollection(edges);
}

public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

List<Tuple2<Long, Long>> data = new ArrayList<Tuple2<Long, Long>>();
data.add(new Tuple2<Long, Long>(1L, 2L));
data.add(new Tuple2<Long, Long>(1L, 15L));
data.add(new Tuple2<Long, Long>(2L, 3L));
data.add(new Tuple2<Long, Long>(2L, 4L));
data.add(new Tuple2<Long, Long>(2L, 5L));
data.add(new Tuple2<Long, Long>(2L, 6L));
data.add(new Tuple2<Long, Long>(2L, 7L));
data.add(new Tuple2<Long, Long>(3L, 13L));
data.add(new Tuple2<Long, Long>(4L, 2L));
data.add(new Tuple2<Long, Long>(5L, 11L));
data.add(new Tuple2<Long, Long>(5L, 12L));
data.add(new Tuple2<Long, Long>(6L, 1L));
data.add(new Tuple2<Long, Long>(6L, 7L));
data.add(new Tuple2<Long, Long>(6L, 8L));
data.add(new Tuple2<Long, Long>(7L, 1L));
data.add(new Tuple2<Long, Long>(7L, 8L));
data.add(new Tuple2<Long, Long>(8L, 1L));
data.add(new Tuple2<Long, Long>(8L, 9L));
data.add(new Tuple2<Long, Long>(8L, 10L));
data.add(new Tuple2<Long, Long>(9L, 14L));
data.add(new Tuple2<Long, Long>(9L, 1L));
data.add(new Tuple2<Long, Long>(10L, 1L));
data.add(new Tuple2<Long, Long>(10L, 13L));
data.add(new Tuple2<Long, Long>(11L, 12L));
data.add(new Tuple2<Long, Long>(11L, 1L));
data.add(new Tuple2<Long, Long>(12L, 1L));
data.add(new Tuple2<Long, Long>(13L, 14L));
data.add(new Tuple2<Long, Long>(14L, 12L));
data.add(new Tuple2<Long, Long>(15L, 1L));

return env.fromCollection(data);
public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
return env.generateSequence(1, 15);
}

public static long getNumberOfPages() {
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 0dc7614

Please sign in to comment.