Skip to content

Commit

Permalink
Enabling getting the actual WEBUI port
Browse files Browse the repository at this point in the history
  • Loading branch information
alig committed Sep 3, 2013
1 parent 636fc0c commit cf7b115
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,8 @@ private[deploy] object DeployMessages {

case object CheckForWorkerTimeOut

case object RequestWebUIPort

case class WebUIPortResponse(boundedPort: Int) {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")

/* Start the Master */
val (masterSystem, masterPort) = Master.startSystemAndActor(localHostname, 0, 0)
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort

Expand Down
17 changes: 14 additions & 3 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ import akka.actor._
import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
import akka.util.duration._
import akka.dispatch.Await
import akka.pattern.ask

import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
import akka.util.Timeout
import java.util.concurrent.TimeUnit


private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
Expand Down Expand Up @@ -180,6 +184,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case CheckForWorkerTimeOut => {
timeOutDeadWorkers()
}

case RequestWebUIPort => {
sender ! WebUIPortResponse(webUi.boundPort.getOrElse(-1))
}
}

/**
Expand Down Expand Up @@ -364,7 +372,7 @@ private[spark] object Master {

def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
actorSystem.awaitTermination()
}

Expand All @@ -378,9 +386,12 @@ private[spark] object Master {
}
}

def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int) = {
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Master(host, boundPort, webUiPort)), name = actorName)
(actorSystem, boundPort)
implicit val timeout = Timeout(1 seconds)
val respFuture = actor ? RequestWebUIPort
val resp = Await.result(respFuture, timeout.duration).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.boundedPort)
}
}

0 comments on commit cf7b115

Please sign in to comment.