diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala new file mode 100644 index 0000000000000..551c339b19523 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.graphx.util.GraphGenerators +import java.io.{PrintWriter, FileOutputStream} + +/** + * The SynthBenchmark application can be used to run various GraphX algorithms on + * synthetic log-normal graphs. The intent of this code is to enable users to + * profile the GraphX system without access to large graph datasets. + */ +object SynthBenchmark { + + /** + * To run this program use the following: + * + * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank + * + * Options: + * -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank) + * -niters the number of iterations of pagerank to use (Default: 10) + * -numVertices the number of vertices in the graph (Default: 1000000) + * -numEPart the number of edge partitions in the graph (Default: number of cores) + * -partStrategy the graph partitioning strategy to use + * -mu the mean parameter for the log-normal graph (Default: 4.0) + * -sigma the stdev parameter for the log-normal graph (Default: 1.3) + * -degFile the local file to save the degree information (Default: Empty) + */ + def main(args: Array[String]) { + val options = args.map { + arg => + arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) => (opt -> v) + case _ => throw new IllegalArgumentException("Invalid argument: " + arg) + } + } + + var app = "pagerank" + var niter = 10 + var numVertices = 100000 + var numEPart: Option[Int] = None + var partitionStrategy: Option[PartitionStrategy] = None + var mu: Double = 4.0 + var sigma: Double = 1.3 + var degFile: String = "" + + options.foreach { + case ("app", v) => app = v + case ("niter", v) => niter = v.toInt + case ("nverts", v) => numVertices = v.toInt + case ("numEPart", v) => numEPart = Some(v.toInt) + case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v)) + case ("mu", v) => mu = v.toDouble + case ("sigma", v) => sigma = v.toDouble + case ("degFile", v) => degFile = v + case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) + } + + val conf = new SparkConf() + .setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") + + val sc = new SparkContext(conf) + + // Create the graph + println(s"Creating graph...") + val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices, + numEPart.getOrElse(sc.defaultParallelism), mu, sigma) + // Repartition the graph + val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache() + + var startTime = System.currentTimeMillis() + val numEdges = graph.edges.count() + println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges") + val loadTime = System.currentTimeMillis() - startTime + + // Collect the degree distribution (if desired) + if (!degFile.isEmpty) { + val fos = new FileOutputStream(degFile) + val pos = new PrintWriter(fos) + val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0)) + .map(p => p._2).countByValue() + hist.foreach { + case (deg, count) => pos.println(s"$deg \t $count") + } + } + + // Run PageRank + startTime = System.currentTimeMillis() + if (app == "pagerank") { + println("Running PageRank") + val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum() + println(s"Total PageRank = $totalPR") + } else if (app == "cc") { + println("Running Connected Components") + val numComponents = graph.connectedComponents.vertices.map(_._2).distinct() + println(s"Number of components = $numComponents") + } + val runTime = System.currentTimeMillis() - startTime + + println(s"Num Vertices = $numVertices") + println(s"Num Edges = $numEdges") + println(s"Creation time = ${loadTime/1000.0} seconds") + println(s"Run time = ${runTime/1000.0} seconds") + + sc.stop() + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 1526ccef06fd4..ef412cfd4e6ea 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -119,4 +119,13 @@ object PartitionStrategy { math.abs((lower, higher).hashCode()) % numParts } } + + /** Returns the PartitionStrategy with the specified name. */ + def fromString(s: String): PartitionStrategy = s match { + case "RandomVertexCut" => RandomVertexCut + case "EdgePartition1D" => EdgePartition1D + case "EdgePartition2D" => EdgePartition2D + case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s) + } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index a3c8de3f9068f..635514f09ece0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -38,19 +38,42 @@ object GraphGenerators { val RMATa = 0.45 val RMATb = 0.15 val RMATd = 0.25 + /** * Generate a graph whose vertex out degree is log normal. + * + * The default values for mu and sigma are taken from the Pregel paper: + * + * Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert, + * Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010. + * Pregel: a system for large-scale graph processing. SIGMOD '10. + * + * @param sc + * @param numVertices + * @param mu + * @param sigma + * @return */ - def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = { - // based on Pregel settings - val mu = 4 - val sigma = 1.3 - - val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{ - src => (src, sampleLogNormal(mu, sigma, numVertices)) + def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int, + mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = { + val vertices = sc.parallelize(0 until numVertices, numEParts).map { src => + // Initialize the random number generator with the source vertex id + val rand = new Random(src) + val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong) + (src.toLong, degree) } - val edges = vertices.flatMap { v => - generateRandomEdges(v._1.toInt, v._2, numVertices) + val edges = vertices.flatMap { case (src, degree) => + new Iterator[Edge[Int]] { + // Initialize the random number generator with the source vertex id + val rand = new Random(src) + var i = 0 + override def hasNext(): Boolean = { i < degree } + override def next(): Edge[Int] = { + val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i) + i += 1 + nextEdge + } + } } Graph(vertices, edges, 0) } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index ecb389de5558f..fc9cbeaec6473 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -35,7 +35,8 @@ object MimaExcludes { val excludes = SparkBuild.SPARK_VERSION match { case v if v.startsWith("1.1") => - Seq() + Seq( + MimaBuild.excludeSparkPackage("graphx")) case v if v.startsWith("1.0") => Seq( MimaBuild.excludeSparkPackage("api.java"), @@ -58,4 +59,3 @@ object MimaExcludes { case _ => Seq() } } -