Skip to content

Commit

Permalink
[FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples
Browse files Browse the repository at this point in the history
Flink job parallelism is set with ExecutionConfig#setParallelism or with
-p on the command-line. The Gelly algorithms JaccardIndex, AdamicAdar,
TriangleListing, and ClusteringCoefficient have intermediate operators
which generate output quadratic in the size of input. These algorithms
may need to be run with a high parallelism but doing so for all
operations is wasteful. Thus was introduced "little parallelism".

This can be simplified by moving the parallelism parameter to the new
common base class with the rule-of-thumb to use the algorithm
parallelism for all normal (small output) operators. The asymptotically
large operators will default to the job parallelism, as will the default
algorithm parallelism.

This closes apache#4282
  • Loading branch information
greghogan committed Jul 11, 2017
1 parent 273223f commit d0cc2c1
Show file tree
Hide file tree
Showing 62 changed files with 293 additions and 1,095 deletions.
14 changes: 7 additions & 7 deletions docs/dev/libs/gelly/library_methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
containing the total number of vertices and average clustering coefficient of the graph. The graph ID type must be
`Comparable` and `Copyable`.

* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setParallelism`: override the parallelism of operators processing small amounts of data

### Global Clustering Coefficient

Expand All @@ -244,7 +244,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
containing the total number of triplets and triangles in the graph. The result class provides a method to compute the
global clustering coefficient score. The graph ID type must be `Comparable` and `Copyable`.

* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setParallelism`: override the parallelism of operators processing small amounts of data

### Local Clustering Coefficient

Expand All @@ -266,7 +266,7 @@ provides a method to compute the local clustering coefficient score. The graph I
`Copyable`.

* `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setParallelism`: override the parallelism of operators processing small amounts of data

### Triadic Census

Expand All @@ -286,7 +286,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
`AnalyticResult` with accessor methods for querying the count of each triad type. The graph ID type must be
`Comparable` and `Copyable`.

* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setParallelism`: override the parallelism of operators processing small amounts of data

### Triangle Listing

Expand All @@ -306,7 +306,7 @@ Directed and undirected variants are provided. The algorithms take a simple grap
`TertiaryResult` containing the three triangle vertices and, for the directed algorithm, a bitmask marking each of the
six potential edges connecting the three vertices. The graph ID type must be `Comparable` and `Copyable`.

* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setParallelism`: override the parallelism of operators processing small amounts of data
* `setSortTriangleVertices`: normalize the triangle listing such that for each result (K0, K1, K2) the vertex IDs are sorted K0 < K1 < K2

## Link Analysis
Expand Down Expand Up @@ -424,9 +424,9 @@ See the [Jaccard Index](#jaccard-index) library method for a similar algorithm.
The algorithm takes a simple undirected graph as input and outputs a `DataSet` of `BinaryResult` containing two vertex
IDs and the Adamic-Adar similarity score. The graph ID type must be `Copyable`.

* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setMinimumRatio`: filter out Adamic-Adar scores less than the given ratio times the average score
* `setMinimumScore`: filter out Adamic-Adar scores less than the given minimum
* `setParallelism`: override the parallelism of operators processing small amounts of data

### Jaccard Index

Expand All @@ -448,8 +448,8 @@ The algorithm takes a simple undirected graph as input and outputs a `DataSet` o
the number of shared neighbors, and the number of distinct neighbors. The result class provides a method to compute the
Jaccard Index score. The graph ID type must be `Copyable`.

* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
* `setMaximumScore`: filter out Jaccard Index scores greater than or equal to the given maximum fraction
* `setMinimumScore`: filter out Jaccard Index scores less than the given minimum fraction
* `setParallelism`: override the parallelism of operators processing small amounts of data

{% top %}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.DoubleParameter;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.types.CopyableValue;

import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
*/
Expand All @@ -43,9 +40,6 @@ public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
.setDefaultValue(0.0)
.setMinimumValue(0.0, true);

private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

@Override
public String getShortDescription() {
return "similarity score weighted by centerpoint degree";
Expand All @@ -64,12 +58,10 @@ public String getLongDescription() {

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();

return graph
.run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
.setMinimumRatio(minRatio.getValue().floatValue())
.setMinimumScore(minScore.getValue().floatValue())
.setLittleParallelism(lp));
.setParallelism(parallelism.getValue().intValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.types.CopyableValue;

import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;

import java.io.PrintStream;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Driver for directed and undirected clustering coefficient algorithm and analytics.
*
Expand All @@ -53,9 +50,6 @@ public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, V
private ChoiceParameter order = new ChoiceParameter(this, "order")
.addChoices(DIRECTED, UNDIRECTED);

private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

private GraphAnalytic<K, VV, EV, ? extends PrintableResult> globalClusteringCoefficient;

private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient;
Expand All @@ -81,37 +75,37 @@ public String getLongDescription() {

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();
int parallelism = this.parallelism.getValue().intValue();

switch (order.getValue()) {
case DIRECTED:
globalClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));

averageClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));

@SuppressWarnings("unchecked")
DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));
return directedResult;

case UNDIRECTED:
globalClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));

averageClusteringCoefficient = graph
.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));

@SuppressWarnings("unchecked")
DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));
return undirectedResult;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

package org.apache.flink.graph.drivers;

import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;

import java.io.PrintStream;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Base class for example drivers.
*
Expand All @@ -33,6 +36,9 @@ public abstract class DriverBase<K, VV, EV>
extends ParameterizedBase
implements Driver<K, VV, EV> {

protected LongParameter parallelism = new LongParameter(this, "__parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

@Override
public String getName() {
return this.getClass().getSimpleName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
if (hasNullValueEdges(edges)) {
return edges
.map(new EdgeToTuple2Map<K, EV>())
.name("Edge to Tuple2");
.name("Edge to Tuple2")
.setParallelism(parallelism.getValue().intValue());
} else {
return edges;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,22 @@ public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
switch (order.getValue()) {
case DIRECTED:
vertexMetrics = graph
.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>());
.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>()
.setParallelism(parallelism.getValue().intValue()));

edgeMetrics = graph
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>());
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>()
.setParallelism(parallelism.getValue().intValue()));
break;

case UNDIRECTED:
vertexMetrics = graph
.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>());
.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>()
.setParallelism(parallelism.getValue().intValue()));

edgeMetrics = graph
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>()
.setParallelism(parallelism.getValue().intValue()));
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public String getLongDescription() {
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
return graph
.run(new org.apache.flink.graph.library.linkanalysis.HITS<K, VV, EV>(
iterationConvergence.getValue().iterations,
iterationConvergence.getValue().convergenceThreshold));
iterationConvergence.getValue().iterations,
iterationConvergence.getValue().convergenceThreshold)
.setParallelism(parallelism.getValue().intValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
*/
Expand All @@ -53,9 +51,6 @@ public class JaccardIndex<K extends CopyableValue<K>, VV, EV>

private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");

private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

@Override
public String getShortDescription() {
return "similarity score as fraction of common neighbors";
Expand All @@ -76,13 +71,11 @@ public String getLongDescription() {

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();

return graph
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
.setMirrorResults(mirrorResults.getValue())
.setLittleParallelism(lp));
.setParallelism(parallelism.getValue().intValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ public String getLongDescription() {
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
return graph
.run(new org.apache.flink.graph.library.linkanalysis.PageRank<K, VV, EV>(
dampingFactor.getValue(),
iterationConvergence.getValue().iterations,
iterationConvergence.getValue().convergenceThreshold));
dampingFactor.getValue(),
iterationConvergence.getValue().iterations,
iterationConvergence.getValue().convergenceThreshold)
.setParallelism(parallelism.getValue().intValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@
import org.apache.flink.graph.asm.result.PrintableResult;
import org.apache.flink.graph.drivers.parameter.BooleanParameter;
import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.types.CopyableValue;

import org.apache.commons.lang3.text.StrBuilder;
import org.apache.commons.lang3.text.WordUtils;

import java.io.PrintStream;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;

/**
* Driver for directed and undirected triangle listing algorithm and analytic.
*
Expand All @@ -56,9 +53,6 @@ public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>

private BooleanParameter computeTriadicCensus = new BooleanParameter(this, "triadic_census");

private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus;

@Override
Expand All @@ -79,35 +73,35 @@ public String getLongDescription() {

@Override
public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
int lp = littleParallelism.getValue().intValue();
int parallelism = this.parallelism.getValue().intValue();

switch (order.getValue()) {
case DIRECTED:
if (computeTriadicCensus.getValue()) {
triadicCensus = graph
.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));
}

@SuppressWarnings("unchecked")
DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
.setSortTriangleVertices(sortTriangleVertices.getValue())
.setLittleParallelism(lp));
.setParallelism(parallelism));
return directedResult;

case UNDIRECTED:
if (computeTriadicCensus.getValue()) {
triadicCensus = graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
.setLittleParallelism(lp));
.setParallelism(parallelism));
}

@SuppressWarnings("unchecked")
DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
.setSortTriangleVertices(sortTriangleVertices.getValue())
.setLittleParallelism(lp));
.setParallelism(parallelism));
return undirectedResult;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Map;
import java.util.TreeMap;

import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT;

/**
Expand All @@ -46,9 +45,6 @@ public class CirculantGraph
private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT);

private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

private List<OffsetRange> offsetRanges = new ArrayList<>();

@Override
Expand Down Expand Up @@ -118,7 +114,7 @@ public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env)
}

return graph
.setParallelism(littleParallelism.getValue().intValue())
.setParallelism(parallelism.getValue().intValue())
.generate();
}
}
Loading

0 comments on commit d0cc2c1

Please sign in to comment.