Skip to content

Commit

Permalink
[FLINK-4896] [gelly] PageRank algorithm for directed graphs
Browse files Browse the repository at this point in the history
Adds a PageRank algorithm using Flink transformation that handles source
and sink vertices (in- or out-degree of zero). The scatter-gather and
gather-sum-apply PageRank implementations are moved to Gelly examples.

This closes apache#2733
  • Loading branch information
greghogan committed Mar 2, 2017
1 parent 438276d commit ea14053
Show file tree
Hide file tree
Showing 9 changed files with 822 additions and 73 deletions.
46 changes: 23 additions & 23 deletions docs/dev/libs/gelly/library_methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,6 @@ The constructor takes one parameter:

* `maxIterations`: the maximum number of iterations to run.

## PageRank

#### Overview
An implementation of a simple [PageRank algorithm](https://en.wikipedia.org/wiki/PageRank), using [scatter-gather iterations](#scatter-gather-iterations).
PageRank is an algorithm that was first used to rank web search engine results. Today, the algorithm and many variations, are used in various graph application domains. The idea of PageRank is that important or relevant pages tend to link to other important pages.

#### Details
The algorithm operates in iterations, where pages distribute their scores to their neighbors (pages they have links to) and subsequently update their scores based on the partial values they receive. The implementation assumes that each page has at least one incoming and one outgoing link.
In order to consider the importance of a link from one page to another, scores are divided by the total number of out-links of the source page. Thus, a page with 10 links will distribute 1/10 of its score to each neighbor, while a page with 100 links, will distribute 1/100 of its score to each neighboring page. This process computes what is often called the transition probablities, i.e. the probability that some page will lead to other page while surfing the web. To correctly compute the transition probabilities, this implementation expects the edge values to be initialised to 1.0.

#### Usage
The algorithm takes as input a `Graph` with any vertex type, `Double` vertex values, and `Double` edge values. Edges values should be initialized to 1.0, in order to correctly compute the transition probabilities. Otherwise, the transition probability for an Edge `(u, v)` will be set to the edge value divided by `u`'s out-degree. The algorithm returns a `DataSet` of vertices, where the vertex value corresponds to assigned rank after convergence (or maximum iterations).
The constructors take the following parameters:

* `beta`: the damping factor.
* `maxIterations`: the maximum number of iterations to run.

## GSA PageRank

The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).

See the [PageRank](#pagerank) library method for implementation details and usage information.

## Single Source Shortest Paths

#### Overview
Expand Down Expand Up @@ -353,6 +330,29 @@ The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3
and authority score. Termination is configured with a maximum number of iterations and/or a convergence threshold
on the sum of the change in each score for each vertex between iterations.

* `setParallelism`: override the operator parallelism

### PageRank

#### Overview
[PageRank](https://en.wikipedia.org/wiki/PageRank) is an algorithm that was first used to rank web search engine
results. Today, the algorithm and many variations are used in various graph application domains. The idea of PageRank is
that important or relevant vertices tend to link to other important vertices.

#### Details
The algorithm operates in iterations, where pages distribute their scores to their neighbors (pages they have links to)
and subsequently update their scores based on the sum of values they receive. In order to consider the importance of a
link from one page to another, scores are divided by the total number of out-links of the source page. Thus, a page with
10 links will distribute 1/10 of its score to each neighbor, while a page with 100 links will distribute 1/100 of its
score to each neighboring page.

#### Usage
The algorithm takes a directed graph as input and outputs a `DataSet` where each `Result` contains the vertex ID and
PageRank score. Termination is configured with a maximum number of iterations and/or a convergence threshold
on the sum of the change in score for each vertex between iterations.

* `setParallelism`: override the operator parallelism

## Metric

### Vertex Metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.graph.library;
package org.apache.flink.graph.examples;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.graph.library;
package org.apache.flink.graph.examples;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
* limitations under the License.
*/

package org.apache.flink.graph.library;
package org.apache.flink.graph.test.examples;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.examples.GSAPageRank;
import org.apache.flink.graph.examples.PageRank;
import org.apache.flink.graph.examples.data.PageRankData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Assert;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.library.link_analysis;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.DoubleValue;

class Functions {

/**
* Sum vertices' scores.
*
* @param <T> ID type
*/
@ForwardedFields("0")
static class SumScore<T>
implements ReduceFunction<Tuple2<T, DoubleValue>> {
@Override
public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
throws Exception {
left.f1.setValue(left.f1.getValue() + right.f1.getValue());
return left;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
Expand All @@ -38,6 +37,7 @@
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
import org.apache.flink.graph.library.link_analysis.HITS.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
Expand All @@ -50,15 +50,15 @@
import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* http://www.cs.cornell.edu/home/kleinber/auth.pdf
*
* Hyperlink-Induced Topic Search computes two interdependent scores for every
* vertex in a directed graph. A good "hub" links to good "authorities" and
* good "authorities" are linked from good "hubs".
*
* This algorithm can be configured to terminate either by a limit on the number
* of iterations, a convergence threshold, or both.
*
* http://www.cs.cornell.edu/home/kleinber/auth.pdf
*
* @param <K> graph ID type
* @param <VV> vertex value type
* @param <EV> edge value type
Expand Down Expand Up @@ -91,7 +91,7 @@ public HITS(int iterations) {

/**
* Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
* terminates When the total change in hub and authority scores over all
* terminates when the total change in hub and authority scores over all
* vertices falls to or below the given threshold value.
*
* @param convergenceThreshold convergence threshold for sum of scores
Expand Down Expand Up @@ -154,13 +154,12 @@ protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
return true;
}


@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
throws Exception {
DataSet<Tuple2<K, K>> edges = input
.getEdges()
.flatMap(new ExtractEdgeIDs<K, EV>())
.map(new ExtractEdgeIDs<K, EV>())
.setParallelism(parallelism)
.name("Extract edge IDs");

Expand Down Expand Up @@ -270,15 +269,15 @@ public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
*/
@ForwardedFields("0; 1")
private static class ExtractEdgeIDs<T, ET>
implements FlatMapFunction<Edge<T, ET>, Tuple2<T, T>> {
implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
private Tuple2<T, T> output = new Tuple2<>();

@Override
public void flatMap(Edge<T, ET> value, Collector<Tuple2<T, T>> out)
public Tuple2<T, T> map(Edge<T, ET> value)
throws Exception {
output.f0 = value.f0;
output.f1 = value.f1;
out.collect(output);
return output;
}
}

Expand Down Expand Up @@ -308,8 +307,7 @@ public Tuple3<T, DoubleValue, DoubleValue> map(Tuple2<T, T> value) throws Except
*
* @param <T> ID type
*/
@ForwardedFieldsFirst("0")
@ForwardedFieldsSecond("0")
@ForwardedFields("0")
private static class SumScores<T>
implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
@Override
Expand Down Expand Up @@ -344,23 +342,6 @@ public void coGroup(Iterable<Tuple3<T, DoubleValue, DoubleValue>> vertex, Iterab
}
}

/**
* Sum vertices' scores.
*
* @param <T> ID type
*/
@ForwardedFieldsFirst("0")
@ForwardedFieldsSecond("0")
private static class SumScore<T>
implements ReduceFunction<Tuple2<T, DoubleValue>> {
@Override
public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
throws Exception {
left.f1.setValue(left.f1.getValue() + right.f1.getValue());
return left;
}
}

/**
* The authority score is the sum of hub scores of vertices on in-edges.
*
Expand Down Expand Up @@ -469,15 +450,17 @@ private static class ChangeInScores<T>
private double changeInScores;

@Override
public void open(Configuration parameters) throws Exception {
public void open(Configuration parameters)
throws Exception {
super.open(parameters);

isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1);
changeInScores = (isInitialSuperstep) ? -1.0 : 0.0;
}

@Override
public void close() throws Exception {
public void close()
throws Exception {
super.close();

DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
Expand All @@ -498,8 +481,8 @@ public Tuple3<T, DoubleValue, DoubleValue> join(Tuple3<T, DoubleValue, DoubleVal

/**
* Monitors the total change in hub and authority scores over all vertices.
* The iteration terminates when the change in scores compared against the
* prior iteration falls below the given convergence threshold.
* The algorithm terminates when the change in scores compared against the
* prior iteration falls to or below the given convergence threshold.
*
* An optimization of this implementation of HITS is to leave the initial
* scores non-normalized; therefore, the change in scores after the first
Expand Down
Loading

0 comments on commit ea14053

Please sign in to comment.