Skip to content

Commit

Permalink
Synthetic GraphX Benchmark
Browse files Browse the repository at this point in the history
This PR accomplishes two things:

1. It introduces a Synthetic Benchmark application that generates an arbitrarily large log-normal graph and executes either PageRank or connected components on the graph.  This can be used to profile GraphX system on arbitrary clusters without access to large graph datasets

2. This PR improves the implementation of the log-normal graph generator.

Author: Joseph E. Gonzalez <[email protected]>
Author: Ankur Dave <[email protected]>

Closes apache#720 from jegonzal/graphx_synth_benchmark and squashes the following commits:

e40812a [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
bccccad [Ankur Dave] Fix long lines
374678a [Ankur Dave] Bugfix and style changes
1bdf39a [Joseph E. Gonzalez] updating options
d943972 [Joseph E. Gonzalez] moving the benchmark application into the examples folder.
f4f839a [Joseph E. Gonzalez] Creating a synthetic benchmark script.
  • Loading branch information
jegonzal authored and ankurdave committed Jun 3, 2014
1 parent aa41a52 commit 894ecde
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.graphx

import org.apache.spark.SparkContext._
import org.apache.spark.graphx.PartitionStrategy
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx.util.GraphGenerators
import java.io.{PrintWriter, FileOutputStream}

/**
* The SynthBenchmark application can be used to run various GraphX algorithms on
* synthetic log-normal graphs. The intent of this code is to enable users to
* profile the GraphX system without access to large graph datasets.
*/
object SynthBenchmark {

/**
* To run this program use the following:
*
* MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank
*
* Options:
* -app "pagerank" or "cc" for pagerank or connected components. (Default: pagerank)
* -niters the number of iterations of pagerank to use (Default: 10)
* -numVertices the number of vertices in the graph (Default: 1000000)
* -numEPart the number of edge partitions in the graph (Default: number of cores)
* -partStrategy the graph partitioning strategy to use
* -mu the mean parameter for the log-normal graph (Default: 4.0)
* -sigma the stdev parameter for the log-normal graph (Default: 1.3)
* -degFile the local file to save the degree information (Default: Empty)
*/
def main(args: Array[String]) {
val options = args.map {
arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}

var app = "pagerank"
var niter = 10
var numVertices = 100000
var numEPart: Option[Int] = None
var partitionStrategy: Option[PartitionStrategy] = None
var mu: Double = 4.0
var sigma: Double = 1.3
var degFile: String = ""

options.foreach {
case ("app", v) => app = v
case ("niter", v) => niter = v.toInt
case ("nverts", v) => numVertices = v.toInt
case ("numEPart", v) => numEPart = Some(v.toInt)
case ("partStrategy", v) => partitionStrategy = Some(PartitionStrategy.fromString(v))
case ("mu", v) => mu = v.toDouble
case ("sigma", v) => sigma = v.toDouble
case ("degFile", v) => degFile = v
case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
}

val conf = new SparkConf()
.setAppName(s"GraphX Synth Benchmark (nverts = $numVertices, app = $app)")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")

val sc = new SparkContext(conf)

// Create the graph
println(s"Creating graph...")
val unpartitionedGraph = GraphGenerators.logNormalGraph(sc, numVertices,
numEPart.getOrElse(sc.defaultParallelism), mu, sigma)
// Repartition the graph
val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)).cache()

var startTime = System.currentTimeMillis()
val numEdges = graph.edges.count()
println(s"Done creating graph. Num Vertices = $numVertices, Num Edges = $numEdges")
val loadTime = System.currentTimeMillis() - startTime

// Collect the degree distribution (if desired)
if (!degFile.isEmpty) {
val fos = new FileOutputStream(degFile)
val pos = new PrintWriter(fos)
val hist = graph.vertices.leftJoin(graph.degrees)((id, _, optDeg) => optDeg.getOrElse(0))
.map(p => p._2).countByValue()
hist.foreach {
case (deg, count) => pos.println(s"$deg \t $count")
}
}

// Run PageRank
startTime = System.currentTimeMillis()
if (app == "pagerank") {
println("Running PageRank")
val totalPR = graph.staticPageRank(niter).vertices.map(_._2).sum()
println(s"Total PageRank = $totalPR")
} else if (app == "cc") {
println("Running Connected Components")
val numComponents = graph.connectedComponents.vertices.map(_._2).distinct()
println(s"Number of components = $numComponents")
}
val runTime = System.currentTimeMillis() - startTime

println(s"Num Vertices = $numVertices")
println(s"Num Edges = $numEdges")
println(s"Creation time = ${loadTime/1000.0} seconds")
println(s"Run time = ${runTime/1000.0} seconds")

sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,4 +119,13 @@ object PartitionStrategy {
math.abs((lower, higher).hashCode()) % numParts
}
}

/** Returns the PartitionStrategy with the specified name. */
def fromString(s: String): PartitionStrategy = s match {
case "RandomVertexCut" => RandomVertexCut
case "EdgePartition1D" => EdgePartition1D
case "EdgePartition2D" => EdgePartition2D
case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut
case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,42 @@ object GraphGenerators {
val RMATa = 0.45
val RMATb = 0.15
val RMATd = 0.25

/**
* Generate a graph whose vertex out degree is log normal.
*
* The default values for mu and sigma are taken from the Pregel paper:
*
* Grzegorz Malewicz, Matthew H. Austern, Aart J.C Bik, James C. Dehnert,
* Ilan Horn, Naty Leiser, and Grzegorz Czajkowski. 2010.
* Pregel: a system for large-scale graph processing. SIGMOD '10.
*
* @param sc
* @param numVertices
* @param mu
* @param sigma
* @return
*/
def logNormalGraph(sc: SparkContext, numVertices: Int): Graph[Int, Int] = {
// based on Pregel settings
val mu = 4
val sigma = 1.3

val vertices: RDD[(VertexId, Int)] = sc.parallelize(0 until numVertices).map{
src => (src, sampleLogNormal(mu, sigma, numVertices))
def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int,
mu: Double = 4.0, sigma: Double = 1.3): Graph[Long, Int] = {
val vertices = sc.parallelize(0 until numVertices, numEParts).map { src =>
// Initialize the random number generator with the source vertex id
val rand = new Random(src)
val degree = math.min(numVertices.toLong, math.exp(rand.nextGaussian() * sigma + mu).toLong)
(src.toLong, degree)
}
val edges = vertices.flatMap { v =>
generateRandomEdges(v._1.toInt, v._2, numVertices)
val edges = vertices.flatMap { case (src, degree) =>
new Iterator[Edge[Int]] {
// Initialize the random number generator with the source vertex id
val rand = new Random(src)
var i = 0
override def hasNext(): Boolean = { i < degree }
override def next(): Edge[Int] = {
val nextEdge = Edge[Int](src, rand.nextInt(numVertices), i)
i += 1
nextEdge
}
}
}
Graph(vertices, edges, 0)
}
Expand Down
4 changes: 2 additions & 2 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ object MimaExcludes {
val excludes =
SparkBuild.SPARK_VERSION match {
case v if v.startsWith("1.1") =>
Seq()
Seq(
MimaBuild.excludeSparkPackage("graphx"))
case v if v.startsWith("1.0") =>
Seq(
MimaBuild.excludeSparkPackage("api.java"),
Expand All @@ -58,4 +59,3 @@ object MimaExcludes {
case _ => Seq()
}
}

0 comments on commit 894ecde

Please sign in to comment.