Skip to content

Commit

Permalink
Revert "[SPARK-2208] Fix for local metrics tests can fail on fast mac…
Browse files Browse the repository at this point in the history
…hines". The test appears to still be flaky after this change, or more flaky.

This reverts commit 5519760.
  • Loading branch information
srowen committed Mar 24, 2016
1 parent 5519760 commit 342079d
Showing 1 changed file with 15 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ import org.scalatest.Matchers

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.{ResetSystemProperties, RpcUtils}

class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers
Expand Down Expand Up @@ -219,24 +215,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
}

test("local metrics") {
val conf = new SparkConf()
.setMaster("local").setAppName("SparkListenerSuite")
.set("spark.shuffle.manager", classOf[SlowShuffleManager].getName)
sc = new SparkContext(conf)
sc = new SparkContext("local", "SparkListenerSuite")
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
sc.addSparkListener(new StatsReportListener)
// just to make sure some of the tasks take a noticeable amount of time
val w = { i: Int =>
if (i == 0) {
Thread.sleep(100)
}
i
}

val numSlices = 16
val d = sc.parallelize(0 to 1e3.toInt, numSlices)
val d = sc.parallelize(0 to 1e3.toInt, numSlices).map(w)
d.count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
listener.stageInfos.size should be (1)

val d2 = d.map { i => i -> i * 2 }.setName("shuffle input 1")
val d3 = d.map { i => i -> (0 to (i % 5)) }.setName("shuffle input 2")
val d2 = d.map { i => w(i) -> i * 2 }.setName("shuffle input 1")
val d3 = d.map { i => w(i) -> (0 to (i % 5)) }.setName("shuffle input 2")
val d4 = d2.cogroup(d3, numSlices).map { case (k, (v1, v2)) =>
k -> (v1.size, v2.size)
w(k) -> (v1.size, v2.size)
}
d4.setName("A Cogroup")
d4.collectAsMap()
Expand All @@ -255,11 +255,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
taskInfoMetrics.map(_._2.executorDeserializeTime),
stageInfo + " executorDeserializeTime")

/* Test is disabled (SEE SPARK-2208)
if (stageInfo.rddInfos.exists(_.name == d4.name)) {
checkNonZeroAvg(
taskInfoMetrics.map(_._2.shuffleReadMetrics.get.fetchWaitTime),
stageInfo + " fetchWaitTime")
}
*/

taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
taskMetrics.resultSize should be > (0L)
Expand Down Expand Up @@ -335,7 +337,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
listener.wait(remainingWait)
remainingWait = finishTime - System.currentTimeMillis
}
assert(listener.startedTasks.nonEmpty)
assert(!listener.startedTasks.isEmpty)
}

f.cancel()
Expand Down Expand Up @@ -474,15 +476,3 @@ private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListene
var count = 0
override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
}

/** Slow ShuffleManager to simulate tasks that takes a noticeable amount of time */
private class SlowShuffleManager(conf: SparkConf) extends SortShuffleManager(conf) {

override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) {

override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
Thread.sleep(10)
super.getBlockData(blockId)
}
}
}

0 comments on commit 342079d

Please sign in to comment.