Skip to content

Commit

Permalink
[SPARK-15107][SQL] Allow varying # iterations by test case in Benchmark
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
This patch changes our micro-benchmark util to allow setting different iteration numbers for different test cases. For some of our benchmarks, turning off whole-stage codegen can make the runtime 20X slower, making it very difficult to run a large number of times without substantially shortening the input cardinality.

With this change, I set the default num iterations to 2 for whole stage codegen off, and 5 for whole stage codegen on. I also updated some results.

## How was this patch tested?
N/A - this is a test util.

Author: Reynold Xin <[email protected]>

Closes apache#12884 from rxin/SPARK-15107.
  • Loading branch information
rxin committed May 4, 2016
1 parent 348c138 commit 695f0e9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 67 deletions.
21 changes: 10 additions & 11 deletions core/src/main/scala/org/apache/spark/util/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ import org.apache.commons.lang3.SystemUtils
private[spark] class Benchmark(
name: String,
valuesPerIteration: Long,
iters: Int = 5,
defaultNumIters: Int = 5,
outputPerIteration: Boolean = false) {
val benchmarks = mutable.ArrayBuffer.empty[Benchmark.Case]

/**
* Adds a case to run when run() is called. The given function will be run for several
* iterations to collect timing statistics.
*/
def addCase(name: String)(f: Int => Unit): Unit = {
addTimerCase(name) { timer =>
def addCase(name: String, numIters: Int = 0)(f: Int => Unit): Unit = {
addTimerCase(name, numIters) { timer =>
timer.startTiming()
f(timer.iteration)
timer.stopTiming()
Expand All @@ -59,8 +59,8 @@ private[spark] class Benchmark(
* until timer.startTiming() is called within the given function. The corresponding
* timer.stopTiming() method must be called before the function returns.
*/
def addTimerCase(name: String)(f: Benchmark.Timer => Unit): Unit = {
benchmarks += Benchmark.Case(name, f)
def addTimerCase(name: String, numIters: Int = 0)(f: Benchmark.Timer => Unit): Unit = {
benchmarks += Benchmark.Case(name, f, if (numIters == 0) defaultNumIters else numIters)
}

/**
Expand All @@ -75,20 +75,19 @@ private[spark] class Benchmark(

val results = benchmarks.map { c =>
println(" Running case: " + c.name)
Benchmark.measure(valuesPerIteration, iters, outputPerIteration)(c.fn)
Benchmark.measure(valuesPerIteration, c.numIters, outputPerIteration)(c.fn)
}
println

val firstBest = results.head.bestMs
// The results are going to be processor specific so it is useful to include that.
println(Benchmark.getJVMOSInfo())
println(Benchmark.getProcessorName())
printf("%-35s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
println("-----------------------------------------------------------------------------------" +
"--------")
println("-" * 96)
results.zip(benchmarks).foreach { case (result, benchmark) =>
printf("%-35s %16s %12s %13s %10s\n",
printf("%-40s %16s %12s %13s %10s\n",
benchmark.name,
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
"%10.1f" format result.bestRate,
Expand Down Expand Up @@ -128,7 +127,7 @@ private[spark] object Benchmark {
}
}

case class Case(name: String, fn: Timer => Unit)
case class Case(name: String, fn: Timer => Unit, numIters: Int)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class RadixSortSuite extends SparkFunSuite with Logging {
RadixSort.sortKeyPrefixArray(buf2, size, 0, 7, false, false)
timer.stopTiming()
}
benchmark.run
benchmark.run()

/**
Running benchmark: radix sort 25000000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import org.apache.spark.util.Benchmark
* Benchmark to measure whole stage codegen performance.
* To run this:
* build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
*
* Benchmarks in this file are skipped in normal builds.
*/
class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
Expand All @@ -44,31 +46,50 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val sc = SparkContext.getOrCreate(conf)
lazy val sqlContext = SQLContext.getOrCreate(sc)

def runBenchmark(name: String, values: Long)(f: => Unit): Unit = {
val benchmark = new Benchmark(name, values)
/** Runs function `f` with whole stage codegen on and off. */
def runBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = {
val benchmark = new Benchmark(name, cardinality)

Seq(false, true).foreach { enabled =>
benchmark.addCase(s"$name codegen=$enabled") { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", enabled.toString)
f
}
benchmark.addCase(s"$name wholestage off", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f
}

benchmark.addCase(s"$name wholestage on", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
f
}

benchmark.run()
}

// These benchmark are skipped in normal build
ignore("range/filter/sum") {
val N = 500L << 20
runBenchmark("rang/filter/sum", N) {
ignore("aggregate without grouping") {
val N = 500L << 22
val benchmark = new Benchmark("agg without grouping", N)
runBenchmark("agg w/o group", N) {
sqlContext.range(N).selectExpr("sum(id)").collect()
}
/*
agg w/o group: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
agg w/o group wholestage off 30136 / 31885 69.6 14.4 1.0X
agg w/o group wholestage on 1851 / 1860 1132.9 0.9 16.3X
*/
}

ignore("filter & aggregate without group") {
val N = 500L << 22
runBenchmark("range/filter/sum", N) {
sqlContext.range(N).filter("(id & 1) = 1").groupBy().sum().collect()
}
/*
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
rang/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
rang/filter/sum codegen=false 14332 / 16646 36.0 27.8 1.0X
rang/filter/sum codegen=true 897 / 1022 584.6 1.7 16.4X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
range/filter/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
range/filter/sum codegen=false 30663 / 31216 68.4 14.6 1.0X
range/filter/sum codegen=true 2399 / 2409 874.1 1.1 12.8X
*/
}

Expand All @@ -86,28 +107,32 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
*/
}

ignore("range/sample/sum") {
val N = 500 << 20
runBenchmark("range/sample/sum", N) {
sqlContext.range(N).sample(true, 0.01).groupBy().sum().collect()
ignore("sample") {
val N = 500 << 18
runBenchmark("sample with replacement", N) {
sqlContext.range(N).sample(withReplacement = true, 0.01).groupBy().sum().collect()
}
/*
Westmere E56xx/L56xx/X56xx (Nehalem-C)
range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
range/sample/sum codegen=false 53888 / 56592 9.7 102.8 1.0X
range/sample/sum codegen=true 41614 / 42607 12.6 79.4 1.3X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
sample with replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sample with replacement codegen=false 7073 / 7227 18.5 54.0 1.0X
sample with replacement codegen=true 5199 / 5203 25.2 39.7 1.4X
*/

runBenchmark("range/sample/sum", N) {
sqlContext.range(N).sample(false, 0.01).groupBy().sum().collect()
runBenchmark("sample without replacement", N) {
sqlContext.range(N).sample(withReplacement = false, 0.01).groupBy().sum().collect()
}
/*
Westmere E56xx/L56xx/X56xx (Nehalem-C)
range/sample/sum: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
range/sample/sum codegen=false 12982 / 13384 40.4 24.8 1.0X
range/sample/sum codegen=true 7074 / 7383 74.1 13.5 1.8X
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
sample without replacement: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
sample without replacement codegen=false 1508 / 1529 86.9 11.5 1.0X
sample without replacement codegen=true 644 / 662 203.5 4.9 2.3X
*/
}

Expand Down Expand Up @@ -151,23 +176,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
}

ignore("aggregate with linear keys") {
val N = 20 << 20
val N = 20 << 22

val benchmark = new Benchmark("Aggregate w keys", N)
def f(): Unit = sqlContext.range(N).selectExpr("(id & 65535) as k").groupBy("k").sum().collect()

benchmark.addCase(s"codegen = F") { iter =>
benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
Expand All @@ -176,36 +201,37 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.run()

/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2067 / 2166 10.1 98.6 1.0X
codegen = T hashmap = F 1149 / 1321 18.3 54.8 1.8X
codegen = T hashmap = T 388 / 475 54.0 18.5 5.3X
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 6619 / 6780 12.7 78.9 1.0X
codegen = T hashmap = F 3935 / 4059 21.3 46.9 1.7X
codegen = T hashmap = T 897 / 971 93.5 10.7 7.4X
*/
}

ignore("aggregate with randomized keys") {
val N = 20 << 20
val N = 20 << 22

val benchmark = new Benchmark("Aggregate w keys", N)
sqlContext.range(N).selectExpr("id", "floor(rand() * 10000) as k").registerTempTable("test")

def f(): Unit = sqlContext.sql("select k, k, sum(id) from test group by k, k").collect()

benchmark.addCase(s"codegen = F") { iter =>
benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
Expand All @@ -214,13 +240,14 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
benchmark.run()

/*
Java HotSpot(TM) 64-Bit Server VM 1.8.0_73-b02 on Mac OS X 10.11.4
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.11
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
codegen = F 2517 / 2608 8.3 120.0 1.0X
codegen = T hashmap = F 1484 / 1560 14.1 70.8 1.7X
codegen = T hashmap = T 794 / 908 26.4 37.9 3.2X
Aggregate w keys: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
codegen = F 7445 / 7517 11.3 88.7 1.0X
codegen = T hashmap = F 4672 / 4703 18.0 55.7 1.6X
codegen = T hashmap = T 1764 / 1958 47.6 21.0 4.2X
*/
}

Expand All @@ -231,18 +258,18 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {
def f(): Unit = sqlContext.range(N).selectExpr("id", "cast(id & 1023 as string) as k")
.groupBy("k").count().collect()

benchmark.addCase(s"codegen = F") { iter =>
benchmark.addCase(s"codegen = F", numIters = 2) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "0")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sqlContext.setConf("spark.sql.codegen.wholeStage", "true")
sqlContext.setConf("spark.sql.codegen.aggregate.map.columns.max", "3")
f()
Expand Down

0 comments on commit 695f0e9

Please sign in to comment.