Skip to content

Commit

Permalink
[FLINK-6382] [gelly] Support additional types for generated graphs in…
Browse files Browse the repository at this point in the history
… Gelly examples

The Gelly examples current support IntValue, LongValue, and StringValue
for RMatGraph. Allow transformations and tests for all generated graphs
for ByteValue, Byte, ShortValue, Short, CharValue, Character, Integer,
Long, and String.

This closes apache#3779
  • Loading branch information
greghogan committed May 1, 2017
1 parent d49efbd commit 3369578
Show file tree
Hide file tree
Showing 38 changed files with 2,054 additions and 463 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.graph.drivers.output.Print;
import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.CopyableValue;

/**
* Driver for directed and undirected graph metrics analytics.
Expand All @@ -36,7 +35,7 @@
* @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
* @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
*/
public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
public class GraphMetrics<K extends Comparable<K>, VV, EV>
extends ParameterizedBase
implements Driver<K, VV, EV>, Hash, Print {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.graph.drivers.output.CSV;
import org.apache.flink.graph.drivers.output.Hash;
import org.apache.flink.graph.drivers.output.Print;
import org.apache.flink.graph.drivers.parameter.BooleanParameter;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
import org.apache.flink.types.CopyableValue;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
.setDefaultValue(PARALLELISM_DEFAULT);

private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");

@Override
public String getName() {
return this.getClass().getSimpleName();
Expand Down Expand Up @@ -88,6 +91,7 @@ protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception
.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
.setMirrorResults(mirrorResults.getValue())
.setLittleParallelism(lp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

Expand All @@ -32,8 +31,7 @@
* Generate a {@link org.apache.flink.graph.generator.CompleteGraph}.
*/
public class CompleteGraph
extends ParameterizedBase
implements Input<LongValue, NullValue, NullValue> {
extends GeneratedGraph<LongValue> {

private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT);
Expand All @@ -48,11 +46,16 @@ public String getName() {

@Override
public String getIdentity() {
return getName() + " (" + vertexCount.getValue() + ")";
return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")";
}

@Override
public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
protected long vertexCount() {
return vertexCount.getValue();
}

@Override
protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
.setParallelism(littleParallelism.getValue().intValue())
.generate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

Expand All @@ -32,8 +31,7 @@
* Generate a {@link org.apache.flink.graph.generator.CycleGraph}.
*/
public class CycleGraph
extends ParameterizedBase
implements Input<LongValue, NullValue, NullValue> {
extends GeneratedGraph<LongValue> {

private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT);
Expand All @@ -48,11 +46,16 @@ public String getName() {

@Override
public String getIdentity() {
return getName() + " (" + vertexCount + ")";
return getTypeName() + " " + getName() + " (" + vertexCount + ")";
}

@Override
public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
protected long vertexCount() {
return vertexCount.getValue();
}

@Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
.setParallelism(littleParallelism.getValue().intValue())
.generate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.parameter.LongParameter;
import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;

Expand All @@ -31,8 +30,7 @@
* Generate an {@link org.apache.flink.graph.generator.EmptyGraph}.
*/
public class EmptyGraph
extends ParameterizedBase
implements Input<LongValue, NullValue, NullValue> {
extends GeneratedGraph<LongValue> {

private LongParameter vertexCount = new LongParameter(this, "vertex_count")
.setMinimumValue(MINIMUM_VERTEX_COUNT);
Expand All @@ -44,11 +42,16 @@ public String getName() {

@Override
public String getIdentity() {
return getName() + " (" + vertexCount + ")";
return getTypeName() + " " + getName() + " (" + vertexCount + ")";
}

@Override
public Graph<LongValue, NullValue, NullValue> create(ExecutionEnvironment env) {
protected long vertexCount() {
return vertexCount.getValue();
}

@Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
.generate();
}
Expand Down
Loading

0 comments on commit 3369578

Please sign in to comment.