From cf7b115496d6847100fee82ce5a1435b52109538 Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Mon, 2 Sep 2013 18:21:21 -0700 Subject: [PATCH] Enabling getting the actual WEBUI port --- .../org/apache/spark/deploy/DeployMessage.scala | 4 ++++ .../apache/spark/deploy/LocalSparkCluster.scala | 2 +- .../org/apache/spark/deploy/master/Master.scala | 17 ++++++++++++++--- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index c31619db279dd..14a453699d3a1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -127,4 +127,8 @@ private[deploy] object DeployMessages { case object CheckForWorkerTimeOut + case object RequestWebUIPort + + case class WebUIPortResponse(boundedPort: Int) {} + } diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 78e3747ad8166..10161c82041b9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ - val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0) + val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7cf0a7754f78c..50d5900ecd433 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -26,6 +26,8 @@ import akka.actor._ import akka.actor.Terminated import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} import akka.util.duration._ +import akka.dispatch.Await +import akka.pattern.ask import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} @@ -33,6 +35,8 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} +import akka.util.Timeout +import java.util.concurrent.TimeUnit private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case CheckForWorkerTimeOut => { timeOutDeadWorkers() } + + case RequestWebUIPort => { + sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1)) + } } /** @@ -364,7 +372,7 @@ private[spark] object Master { def main(argStrings: Array[String]) { val args = new MasterArguments(argStrings) - val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort) + val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort) actorSystem.awaitTermination() } @@ -378,9 +386,12 @@ private[spark] object Master { } } - def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = { + def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName) - (actorSystem, boundPort) + implicit val timeout = Timeout(1 seconds) + val respFuture = actor ? RequestWebUIPort + val resp = Await.result(respFuture, timeout.duration).asInstanceOf[WebUIPortResponse] + (actorSystem, boundPort, resp.boundedPort) } }