Skip to content

Commit

Permalink
[FLINK-4264] [gelly] New GraphMetrics driver
Browse files Browse the repository at this point in the history
Updates VertexMetrics analytic, adds directed and undirected
EdgeMetric analytics, and includes a new GraphMetrics driver.

This closes apache#2295
  • Loading branch information
greghogan committed Aug 24, 2016
1 parent ad8e665 commit 58850f2
Show file tree
Hide file tree
Showing 16 changed files with 1,600 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.graph.driver;

import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.lang3.text.WordUtils;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAnalytic;
import org.apache.flink.graph.GraphCsvReader;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.types.StringValue;

import java.text.NumberFormat;

/**
* Computes vertex and edge metrics on a directed or undirected graph.
*
* @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
* @see org.apache.flink.graph.library.metric.directed.VertexMetrics
* @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
* @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
*/
public class GraphMetrics {

public static final int DEFAULT_SCALE = 10;

public static final int DEFAULT_EDGE_FACTOR = 16;

public static final boolean DEFAULT_CLIP_AND_FLIP = true;

private static void printUsage() {
System.out.println(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80));
System.out.println();
System.out.println("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
System.out.println(" --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
}

public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().enableObjectReuse();

ParameterTool parameters = ParameterTool.fromArgs(args);
if (! parameters.has("directed")) {
printUsage();
return;
}
boolean directedAlgorithm = parameters.getBoolean("directed");

GraphAnalytic vm;
GraphAnalytic em;

switch (parameters.get("input", "")) {
case "csv": {
String lineDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));

String fieldDelimiter = StringEscapeUtils.unescapeJava(
parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));

GraphCsvReader reader = Graph
.fromCsvReader(parameters.get("input_filename"), env)
.ignoreCommentsEdges("#")
.lineDelimiterEdges(lineDelimiter)
.fieldDelimiterEdges(fieldDelimiter);

switch (parameters.get("type", "")) {
case "integer": {
Graph<LongValue, NullValue, NullValue> graph = reader
.keyType(LongValue.class);

if (directedAlgorithm) {
if (parameters.getBoolean("simplify", false)) {
graph = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
}

vm = graph
.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
em = graph
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
} else {
if (parameters.getBoolean("simplify", false)) {
graph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
}

vm = graph
.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
em = graph
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
}
} break;

case "string": {
Graph<StringValue, NullValue, NullValue> graph = reader
.keyType(StringValue.class);

if (directedAlgorithm) {
if (parameters.getBoolean("simplify", false)) {
graph = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
}

vm = graph
.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
em = graph
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
} else {
if (parameters.getBoolean("simplify", false)) {
graph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
}

vm = graph
.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
em = graph
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
}
} break;

default:
printUsage();
return;
}
} break;

case "rmat": {
int scale = parameters.getInt("scale", DEFAULT_SCALE);
int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);

RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();

long vertexCount = 1L << scale;
long edgeCount = vertexCount * edgeFactor;


Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.generate();

if (directedAlgorithm) {
if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());

vm = newGraph
.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
em = newGraph
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());

vm = newGraph
.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
em = newGraph
.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);

if (scale > 32) {
Graph<LongValue, NullValue, NullValue> newGraph = graph
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));

vm = newGraph
.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
em = newGraph
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));

vm = newGraph
.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
em = newGraph
.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
}
}
} break;

default:
printUsage();
return;
}

env.execute("Graph Metrics");

System.out.print("Vertex metrics:\n ");
System.out.println(vm.getResult().toString().replace(";", "\n "));
System.out.print("\nEdge metrics:\n ");
System.out.println(em.getResult().toString().replace(";", "\n "));

JobExecutionResult result = env.getLastJobExecutionResult();

NumberFormat nf = NumberFormat.getInstance();
System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private static void printUsage() {
System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
" the vertex, and the number of edges between vertex neighbors.", 80));
System.out.println();
System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
Expand Down Expand Up @@ -174,7 +174,8 @@ public static void main(String[] args) throws Exception {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
Expand All @@ -183,7 +184,8 @@ public static void main(String[] args) throws Exception {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
}
} else {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Expand All @@ -195,7 +197,8 @@ public static void main(String[] args) throws Exception {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
} else {
Graph<IntValue, NullValue, NullValue> newGraph = graph
.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
Expand All @@ -204,7 +207,8 @@ public static void main(String[] args) throws Exception {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
}
}
} break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ private static void printUsage() {
" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
" and good \"authorities\" are linked from good \"hubs\".", 80));
System.out.println();
System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static void printUsage() {
System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
" number of shared neighbors, and the number of distinct neighbors.", 80));
System.out.println();
System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private static void printUsage() {
System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80));
System.out.println();
System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
System.out.println();
System.out.println("options:");
System.out.println(" --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {

@Override
protected String getAlgorithmName() {
return VertexOutDegree.class.getName();
return VertexDegrees.class.getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
import org.apache.flink.graph.utils.Murmur3_32;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.CopyableValue;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -59,8 +60,25 @@ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<
extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {

// Optional configuration
private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);

private int littleParallelism = PARALLELISM_DEFAULT;

/**
* By default the vertex set is checked for zero degree vertices. When this
* flag is disabled only clustering coefficient scores for vertices with
* a degree of a least one will be produced.
*
* @param includeZeroDegreeVertices whether to output scores for vertices
* with a degree of zero
* @return this
*/
public LocalClusteringCoefficient<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);

return this;
}

/**
* Override the parallelism of operators processing small amounts of data.
*
Expand Down Expand Up @@ -90,6 +108,16 @@ protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {

LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;

// verify that configurations can be merged

if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
return false;
}

// merge configurations

includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);

littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);

return true;
Expand Down Expand Up @@ -128,8 +156,8 @@ public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
// u, deg(u)
DataSet<Vertex<K, Degrees>> vertexDegree = input
.run(new VertexDegrees<K, VV, EV>()
.setParallelism(littleParallelism)
.setIncludeZeroDegreeVertices(true));
.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
.setParallelism(littleParallelism));

// u, deg(u), triangle count
return vertexDegree
Expand Down
Loading

0 comments on commit 58850f2

Please sign in to comment.