Skip to content

Commit

Permalink
[FLINK-3799] [gelly] Graph checksum should execute single job
Browse files Browse the repository at this point in the history
This closes apache#1922
  • Loading branch information
greghogan committed Apr 26, 2016
1 parent a58cd80 commit 06bf4bf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.graph.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.Utils.ChecksumHashCode
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.utils._
import org.apache.flink.api.java.Utils
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.AbstractID

import scala.reflect.ClassTag

Expand All @@ -35,16 +35,26 @@ package object utils {
TypeInformation : ClassTag](val self: Graph[K, VV, EV]) {

/**
* Computes the ChecksumHashCode over the Graph.
*
* @return the ChecksumHashCode over the vertices and edges.
*/
* Convenience method to get the count (number of elements) of a Graph
* as well as the checksum (sum over element hashes). The vertex and
* edge DataSets are processed in a single job and the resultant counts
* and checksums are merged locally.
*
* @return the checksum over the vertices and edges
*/
@throws(classOf[Exception])
def checksumHashCode(): ChecksumHashCode = {
val checksum: ChecksumHashCode = self.getVertices.checksumHashCode()
checksum.add(self.getEdges checksumHashCode())
def checksumHashCode(): Utils.ChecksumHashCode = {
val verticesId = new AbstractID().toString
self.getVertices.output(new Utils.ChecksumHashCodeHelper[Vertex[K,VV]](verticesId))

val edgesId = new AbstractID().toString
self.getEdges.output(new Utils.ChecksumHashCodeHelper[Edge[K,EV]](edgesId))

val res = self.getWrappedGraph.getContext.execute()

val checksum = res.getAccumulatorResult[Utils.ChecksumHashCode](verticesId)
checksum.add(res.getAccumulatorResult[Utils.ChecksumHashCode](edgesId))
checksum
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@

package org.apache.flink.graph.utils;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.util.AbstractID;

public class GraphUtils {

/**
* Computes the checksum over the Graph
* Convenience method to get the count (number of elements) of a Graph
* as well as the checksum (sum over element hashes). The vertex and
* edge DataSets are processed in a single job and the resultant counts
* and checksums are merged locally.
*
* @return the checksum over the vertices and edges.
* @param graph Graph over which to compute the count and checksum
* @return the checksum over the vertices and edges
*/
public static Utils.ChecksumHashCode checksumHashCode(Graph graph) throws Exception {
ChecksumHashCode checksum = DataSetUtils.checksumHashCode(graph.getVertices());
checksum.add(DataSetUtils.checksumHashCode(graph.getEdges()));
public static <K, VV, EV> Utils.ChecksumHashCode checksumHashCode(Graph<K, VV, EV> graph) throws Exception {
final String verticesId = new AbstractID().toString();
graph.getVertices().output(new Utils.ChecksumHashCodeHelper<Vertex<K, VV>>(verticesId)).name("ChecksumHashCode vertices");

final String edgesId = new AbstractID().toString();
graph.getEdges().output(new Utils.ChecksumHashCodeHelper<Edge<K, EV>>(edgesId)).name("ChecksumHashCode edges");

JobExecutionResult res = graph.getContext().execute();

Utils.ChecksumHashCode checksum = res.<Utils.ChecksumHashCode>getAccumulatorResult(verticesId);
checksum.add(res.<Utils.ChecksumHashCode>getAccumulatorResult(edgesId));

return checksum;
}
}

0 comments on commit 06bf4bf

Please sign in to comment.