Skip to content

Commit

Permalink
SPARK-3093 : masterLock in Worker is no longer need
Browse files Browse the repository at this point in the history
there's no need to use masterLock in Worker now since all communications are within Akka actor

Author: CrazyJvm <[email protected]>

Closes apache#2008 from CrazyJvm/no-need-master-lock and squashes the following commits:

dd39e20 [CrazyJvm] fix format
58e7fa5 [CrazyJvm] there's no need to use masterLock now since all communications are within Akka actor
  • Loading branch information
CrazyJvm authored and aarondav committed Aug 18, 2014
1 parent 9306b8c commit c0cbbde
Showing 1 changed file with 14 additions and 27 deletions.
41 changes: 14 additions & 27 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ private[spark] class Worker(
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)

val testing: Boolean = sys.props.contains("spark.testing")
val masterLock: Object = new Object()
var master: ActorSelection = null
var masterAddress: Address = null
var activeMasterUrl: String = ""
Expand Down Expand Up @@ -145,18 +144,16 @@ private[spark] class Worker(
}

def changeMaster(url: String, uiUrl: String) {
masterLock.synchronized {
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
activeMasterUrl = url
activeMasterWebUiUrl = uiUrl
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
masterAddress = activeMasterUrl match {
case Master.sparkUrlRegex(_host, _port) =>
Address("akka.tcp", Master.systemName, _host, _port.toInt)
case x =>
throw new SparkException("Invalid spark URL: " + x)
}
connected = true
}

def tryRegisterAllMasters() {
Expand Down Expand Up @@ -199,9 +196,7 @@ private[spark] class Worker(
}

case SendHeartbeat =>
masterLock.synchronized {
if (connected) { master ! Heartbeat(workerId) }
}
if (connected) { master ! Heartbeat(workerId) }

case WorkDirCleanup =>
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
Expand Down Expand Up @@ -244,27 +239,21 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
}
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
Expand Down Expand Up @@ -330,9 +319,7 @@ private[spark] class Worker(
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
masterLock.synchronized {
master ! DriverStateChanged(driverId, state, exception)
}
master ! DriverStateChanged(driverId, state, exception)
val driver = drivers.remove(driverId).get
finishedDrivers(driverId) = driver
memoryUsed -= driver.driverDesc.mem
Expand Down

0 comments on commit c0cbbde

Please sign in to comment.