Skip to content

Commit

Permalink
[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms
Browse files Browse the repository at this point in the history
Gelly algorithms commonly have a Result class extending a Tuple type and
implementing one of the Unary/Binary/TertiaryResult interfaces.

Add a Unary/Binary/TertiaryResultBase class implementing each interface
and convert the Result classes to POJOs extending the base result
classes.

This closes apache#4201
  • Loading branch information
greghogan committed Jul 5, 2017
1 parent 7c150a6 commit 98a1500
Show file tree
Hide file tree
Showing 48 changed files with 933 additions and 733 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ public String getIdentity() {
return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
}

/**
* Generate the graph as configured.
*
* @param env execution environment
* @return input graph
*/
@Override
public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
GraphCsvReader reader = Graph.fromCsvReader(inputFilename.getValue(), env)
.ignoreCommentsEdges(commentPrefix.getValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,7 @@ protected String getSimplifyShortString() {
return simplify.getShortString();
}

/**
* Generate the graph as configured.
*
* @param env Flink execution environment
* @return input graph
*/
@Override
public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env)
throws Exception {
Graph<K, NullValue, NullValue> graph = super.create(env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,7 @@ protected long vertexCount() {
return 1L << scale.getValue();
}

/**
* Generate the graph as configured.
*
* @param env Flink execution environment
* @return input graph
*/
@Override
public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
int lp = littleParallelism.getValue().intValue();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,7 @@ public ChoiceParameter(ParameterizedBase owner, String name) {
super(owner, name);
}

/**
* Set the default value and add to the list of choices.
*
* @param defaultValue the default value.
* @return this
*/
@Override
public ChoiceParameter setDefaultValue(String defaultValue) {
super.setDefaultValue(defaultValue);
choices.add(defaultValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,7 @@ public StringParameter(ParameterizedBase owner, String name) {
super(owner, name);
}

/**
* Set the default value.
*
* @param defaultValue the default value.
* @return this
*/
@Override
public StringParameter setDefaultValue(String defaultValue) {
super.setDefaultValue(defaultValue);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ public void testHashWithSmallDirectedRMatGraph() throws Exception {
case "short":
case "char":
case "integer":
checksum = 0x000000003d2f0a9aL;
checksum = 0x00000784d2c336cdL;
break;

case "long":
checksum = 0x000000016aba3720L;
checksum = 0x0000078e5ebf2927L;
break;

case "string":
checksum = 0x0000005bfef84facL;
checksum = 0x0000077eddf67481L;
break;

default:
Expand Down Expand Up @@ -115,15 +115,15 @@ public void testHashWithSmallUndirectedRMatGraph() throws Exception {
case "short":
case "char":
case "integer":
checksum = 0x0000000001f92b0cL;
checksum = 0x0000075b6b9a0ad0L;
break;

case "long":
checksum = 0x000000000bb355c6L;
checksum = 0x00000761619e7f3cL;
break;

case "string":
checksum = 0x00000002f7b5576aL;
checksum = 0x0000079b15eb30acL;
break;

default:
Expand Down Expand Up @@ -154,15 +154,15 @@ public void testHashWithLargeDirectedRMatGraph() throws Exception {
case "short":
case "char":
case "integer":
checksum = 0x00000248fef26209L;
checksum = 0x0003a986d6bedc53L;
break;

case "long":
checksum = 0x000002dcdf0fbb1bL;
checksum = 0x0003a8d91c92b884L;
break;

case "string":
checksum = 0x00035b760ab9da74L;
checksum = 0x0003a88fffc33f27L;
break;

default:
Expand Down Expand Up @@ -205,15 +205,15 @@ public void testHashWithLargeUndirectedRMatGraph() throws Exception {
case "short":
case "char":
case "integer":
checksum = 0x00000012dee4bf2cL;
checksum = 0x0003a95630aae344L;
break;

case "long":
checksum = 0x00000017a40efbdaL;
checksum = 0x0003a9b1e055d59dL;
break;

case "string":
checksum = 0x000159e8be3e370bL;
checksum = 0x0003aa1e3d8f2c6bL;
break;

default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.AnalyticHelper;

import java.io.IOException;
Expand All @@ -47,7 +46,6 @@ public Collect<T> run(DataSet<T> input)
throws Exception {
super.run(input);

ExecutionEnvironment env = input.getExecutionEnvironment();
serializer = input.getType().createSerializer(env.getConfig());

collectHelper = new CollectHelper<>(serializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@
public interface DataSetAnalytic<T, R> {

/**
* This method must be called after the program has executed.
* 1) "run" analytics and algorithms
* 2) call ExecutionEnvironment.execute()
* 3) get analytic results
* All {@code DataSetAnalytic} processing must be terminated by an
* {@link OutputFormat} and obtained via accumulators rather than
* returned by a {@link DataSet}.
*
* @return the result
* @param input input dataset
* @return this
* @throws Exception
*/
R getResult();

DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;

/**
* Execute the program and return the result.
Expand All @@ -62,13 +64,12 @@ public interface DataSetAnalytic<T, R> {
R execute(String jobName) throws Exception;

/**
* All {@code DataSetAnalytic} processing must be terminated by an
* {@link OutputFormat} and obtained via accumulators rather than
* returned by a {@link DataSet}.
* This method must be called after the program has executed.
* 1) "run" analytics and algorithms
* 2) call ExecutionEnvironment.execute()
* 3) get analytic results
*
* @param input input dataset
* @return this
* @throws Exception
* @return the result
*/
DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;
R getResult();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -59,12 +60,7 @@ public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return EdgeDegreesPair.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -58,12 +59,7 @@ public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return EdgeSourceDegrees.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -58,12 +59,7 @@ public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return EdgeTargetDegrees.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
import org.apache.flink.graph.utils.MurmurHash;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.ByteValue;
Expand Down Expand Up @@ -85,12 +86,7 @@ public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return VertexDegrees.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!VertexDegrees.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
Expand Down Expand Up @@ -78,12 +79,7 @@ public VertexInDegree<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return VertexInDegree.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!VertexInDegree.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
Expand Down Expand Up @@ -78,12 +79,7 @@ public VertexOutDegree<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return VertexOutDegree.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!VertexOutDegree.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
Expand Down Expand Up @@ -80,12 +81,7 @@ public EdgeDegreePair<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return EdgeDegreePair.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;
Expand Down Expand Up @@ -78,12 +79,7 @@ public EdgeSourceDegree<K, VV, EV> setParallelism(int parallelism) {
}

@Override
protected String getAlgorithmName() {
return EdgeSourceDegree.class.getName();
}

@Override
protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
Preconditions.checkNotNull(other);

if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
Expand Down
Loading

0 comments on commit 98a1500

Please sign in to comment.