Skip to content

Commit

Permalink
[SPARK-3490] Disable SparkUI for tests (backport into 1.1)
Browse files Browse the repository at this point in the history
Original PR: apache#2363

Author: Andrew Or <[email protected]>

Closes apache#2415 from andrewor14/disable-ui-for-tests-1.1 and squashes the following commits:

8d9df5a [Andrew Or] Oops, missed one.
509507d [Andrew Or] Backport apache#2363 (SPARK-3490) into branch-1.1
  • Loading branch information
andrewor14 committed Sep 17, 2014
1 parent 856156b commit 937de93
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 38 deletions.
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,14 @@ class SparkContext(config: SparkConf) extends Logging {
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparkUI(this))
} else {
// For tests, do not enable the UI
None
}
ui.foreach(_.bind())

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration: Configuration = {
Expand Down Expand Up @@ -1008,7 +1014,7 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
postApplicationEnd()
ui.stop()
ui.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
logInfo(s"Add WebUI Filter. $filterName, $filterParams, $proxyBase")
conf.set("spark.ui.filters", filterName)
conf.set(s"spark.$filterName.params", filterParams)
JettyUtils.addFilters(scheduler.sc.ui.getHandlers, conf)
scheduler.sc.ui.foreach { ui => JettyUtils.addFilters(ui.getHandlers, conf) }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,17 @@ private[spark] class SimrSchedulerBackend(

val conf = new Configuration()
val fs = FileSystem.get(conf)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")

logInfo("Writing to HDFS file: " + driverFilePath)
logInfo("Writing Akka address: " + driverUrl)
logInfo("Writing Spark UI Address: " + sc.ui.appUIAddress)
logInfo("Writing Spark UI Address: " + appUIAddress)

// Create temporary file to prevent race condition where executors get empty driverUrl file
val temp = fs.create(tmpPath, true)
temp.writeUTF(driverUrl)
temp.writeInt(maxCores)
temp.writeUTF(sc.ui.appUIAddress)
temp.writeUTF(appUIAddress)
temp.close()

// "Atomic" rename
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ private[spark] class SparkDeploySchedulerBackend(
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val eventLogDir = sc.eventLogger.map(_.logDir)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
appUIAddress, eventLogDir)

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
44 changes: 31 additions & 13 deletions core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,25 @@ import scala.xml.Node

class UISuite extends FunSuite {

/**
* Create a test SparkContext with the SparkUI enabled.
* It is safe to `get` the SparkUI directly from the SparkContext returned here.
*/
private def newSparkContext(): SparkContext = {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.ui.enabled", "true")
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)
sc
}

ignore("basic ui visibility") {
withSpark(new SparkContext("local", "test")) { sc =>
withSpark(newSparkContext()) { sc =>
// test if the ui is visible, and all the expected tabs are visible
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
val html = Source.fromURL(sc.ui.get.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
assert(html.toLowerCase.contains("stages"))
assert(html.toLowerCase.contains("storage"))
Expand All @@ -50,7 +64,7 @@ class UISuite extends FunSuite {
}

ignore("visibility at localhost:4040") {
withSpark(new SparkContext("local", "test")) { sc =>
withSpark(newSparkContext()) { sc =>
// test if visible from http://localhost:4040
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL("http://localhost:4040").mkString
Expand All @@ -60,8 +74,8 @@ class UISuite extends FunSuite {
}

ignore("attaching a new tab") {
withSpark(new SparkContext("local", "test")) { sc =>
val sparkUI = sc.ui
withSpark(newSparkContext()) { sc =>
val sparkUI = sc.ui.get

val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
Expand All @@ -72,7 +86,7 @@ class UISuite extends FunSuite {
}
sparkUI.attachTab(newTab)
eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress).mkString
val html = Source.fromURL(sparkUI.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))

// check whether new page exists
Expand All @@ -86,7 +100,7 @@ class UISuite extends FunSuite {
}

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString
val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
// check whether new page exists
assert(html.contains("magic"))
}
Expand Down Expand Up @@ -125,16 +139,20 @@ class UISuite extends FunSuite {
}

test("verify appUIAddress contains the scheme") {
withSpark(new SparkContext("local", "test")) { sc =>
val uiAddress = sc.ui.appUIAddress
assert(uiAddress.equals("http://" + sc.ui.appUIHostPort))
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val uiAddress = ui.appUIAddress
val uiHostPort = ui.appUIHostPort
assert(uiAddress.equals("http://" + uiHostPort))
}
}

test("verify appUIAddress contains the port") {
withSpark(new SparkContext("local", "test")) { sc =>
val splitUIAddress = sc.ui.appUIAddress.split(':')
assert(splitUIAddress(2).toInt == sc.ui.boundPort)
withSpark(newSparkContext()) { sc =>
val ui = sc.ui.get
val splitUIAddress = ui.appUIAddress.split(':')
val boundPort = ui.boundPort
assert(splitUIAddress(2).toInt == boundPort)
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@
<java.awt.headless>true</java.awt.headless>
<spark.test.home>${session.executionRootDirectory}</spark.test.home>
<spark.testing>1</spark.testing>
<spark.ui.enabled>false</spark.ui.enabled>
</systemProperties>
</configuration>
<executions>
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ object TestSettings {
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dspark.ports.maxRetries=100",
javaOptions in Test += "-Dspark.ui.port=0",
javaOptions in Test += "-Dspark.ui.enabled=false",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
.map { case (k,v) => s"-D$k=$v" }.toSeq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorSupervisorStrategy, ActorReceiver, Receiver}
import org.apache.spark.streaming.scheduler._
import org.apache.spark.streaming.ui.StreamingTab
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.MetadataCleaner

/**
Expand Down Expand Up @@ -158,7 +158,14 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

private[streaming] val uiTab = new StreamingTab(this)
private[streaming] val progressListener = new StreamingJobProgressListener(this)

private[streaming] val uiTab: Option[StreamingTab] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new StreamingTab(this))
} else {
None
}

/** Register streaming source to metrics system */
private val streamingSource = new StreamingSource(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
override val metricRegistry = new MetricRegistry
override val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)

private val streamingListener = ssc.uiTab.listener
private val streamingListener = ssc.progressListener

private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
defaultValue: T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,31 @@

package org.apache.spark.streaming.ui

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.ui.SparkUITab
import org.apache.spark.ui.{SparkUI, SparkUITab}

/** Spark Web UI tab that shows statistics of a streaming job */
import StreamingTab._

/**
* Spark Web UI tab that shows statistics of a streaming job.
* This assumes the given SparkContext has enabled its SparkUI.
*/
private[spark] class StreamingTab(ssc: StreamingContext)
extends SparkUITab(ssc.sc.ui, "streaming") with Logging {
extends SparkUITab(getSparkUI(ssc), "streaming") with Logging {

val parent = ssc.sc.ui
val listener = new StreamingJobProgressListener(ssc)
val parent = getSparkUI(ssc)
val listener = ssc.progressListener

ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
}

private object StreamingTab {
def getSparkUI(ssc: StreamingContext): SparkUI = {
ssc.sc.ui.getOrElse {
throw new SparkException("Parent SparkUI to attach this tab to not found!")
}
}
}
16 changes: 12 additions & 4 deletions streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark.SparkConf

class UISuite extends FunSuite {

// Ignored: See SPARK-1530
ignore("streaming tab in spark UI") {
val ssc = new StreamingContext("local", "test", Seconds(1))
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.ui.enabled", "true")
val ssc = new StreamingContext(conf, Seconds(1))
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
val ui = ssc.sc.ui.get

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
val html = Source.fromURL(ui.appUIAddress).mkString
assert(!html.contains("random data that should not be present"))
// test if streaming tab exist
assert(html.toLowerCase.contains("streaming"))
Expand All @@ -39,8 +48,7 @@ class UISuite extends FunSuite {
}

eventually(timeout(10 seconds), interval(50 milliseconds)) {
val html = Source.fromURL(
ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
assert(html.toLowerCase.contains("batch"))
assert(html.toLowerCase.contains("network"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
assert(sparkContext != null || count >= numTries)

if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIHostPort
uiAddress = sparkContext.ui.map(_.appUIHostPort).getOrElse("")
uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ private[spark] class YarnClientSchedulerBackend(
val driverHost = conf.get("spark.driver.host")
val driverPort = conf.get("spark.driver.port")
val hostport = driverHost + ":" + driverPort
conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
sc.ui.foreach { ui =>
conf.set("spark.driver.appUIAddress", ui.appUIHostPort)
conf.set("spark.driver.appUIHistoryAddress",
YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
}

val argsArrayBuf = new ArrayBuffer[String]()
argsArrayBuf += (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
assert(sparkContext != null || numTries >= maxNumTries)

if (sparkContext != null) {
uiAddress = sparkContext.ui.appUIHostPort
uiAddress = sparkContext.ui.map(_.appUIHostPort).getOrElse("")
uiHistoryAddress = YarnSparkHadoopUtil.getUIHistoryAddress(sparkContext, sparkConf)
this.yarnAllocator = YarnAllocationHandler.newAllocator(
yarnConf,
Expand Down

0 comments on commit 937de93

Please sign in to comment.