Skip to content

Commit

Permalink
[SPARK-6287] [MESOS] Add dynamic allocation to the coarse-grained Mes…
Browse files Browse the repository at this point in the history
…os scheduler

This is largely based on extracting the dynamic allocation parts from tnachen's apache#3861.

Author: Iulian Dragos <[email protected]>

Closes apache#4984 from dragos/issue/mesos-coarse-dynamicAllocation and squashes the following commits:

39df8cd [Iulian Dragos] Update tests to latest changes in core.
9d2c9fa [Iulian Dragos] Remove adjustment of executorLimitOption in doKillExecutors.
8b00f52 [Iulian Dragos] Latest round of reviews.
0cd00e0 [Iulian Dragos] Add persistent shuffle directory
15c45c1 [Iulian Dragos] Add dynamic allocation to the Spark coarse-grained scheduler.
  • Loading branch information
dragos authored and Andrew Or committed Jul 9, 2015
1 parent ebdf585 commit c483059
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 56 deletions.
19 changes: 11 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_executorAllocationManager =
if (dynamicAllocationEnabled) {
assert(supportDynamicAllocation,
"Dynamic allocation of executors is currently only supported in YARN mode")
"Dynamic allocation of executors is currently only supported in YARN and Mesos mode")
Some(new ExecutorAllocationManager(this, listenerBus, _conf))
} else {
None
Expand Down Expand Up @@ -853,7 +853,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions).setName(path)
}


/**
* :: Experimental ::
*
Expand Down Expand Up @@ -1364,10 +1363,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/**
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
* this application is supported. This is currently only available for YARN
* and Mesos coarse-grained mode.
*/
private[spark] def supportDynamicAllocation =
master.contains("yarn") || _conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] def supportDynamicAllocation: Boolean = {
(master.contains("yarn")
|| master.contains("mesos")
|| _conf.getBoolean("spark.dynamicAllocation.testing", false))
}

/**
* :: DeveloperApi ::
Expand All @@ -1385,7 +1388,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestTotalExecutors(numExecutors)
Expand All @@ -1403,7 +1406,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(supportDynamicAllocation,
"Requesting executors is currently only supported in YARN mode")
"Requesting executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1421,7 +1424,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(supportDynamicAllocation,
"Killing executors is currently only supported in YARN mode")
"Killing executors is currently only supported in YARN and Mesos modes")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{List => JList}
import java.util.{List => JList, Collections}
import java.util.concurrent.locks.ReentrantLock

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
Expand Down Expand Up @@ -60,9 +63,27 @@ private[spark] class CoarseMesosSchedulerBackend(

val slaveIdsWithExecutors = new HashSet[String]

val taskIdToSlaveId = new HashMap[Int, String]
val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]

/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation
* and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
*/
private var executorLimitOption: Option[Int] = None

/**
* Return the current executor limit, which may be [[Int.MaxValue]]
* before properly initialized.
*/
private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)

private val pendingRemovedSlaveIds = new HashSet[String]

// private lock object protecting mutable state above. Using the intrinsic lock
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock

val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)

Expand All @@ -86,7 +107,7 @@ private[spark] class CoarseMesosSchedulerBackend(
startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
}

def createCommand(offer: Offer, numCores: Int): CommandInfo = {
def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
Expand Down Expand Up @@ -120,10 +141,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = sc.env.rpcEnv.uriOf(
SparkEnv.driverActorSystemName,
RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)

val uri = conf.getOption("spark.executor.uri")
.orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
Expand All @@ -133,7 +150,7 @@ private[spark] class CoarseMesosSchedulerBackend(
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverUrl" +
s" --driver-url $driverURL" +
s" --executor-id ${offer.getSlaveId.getValue}" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
Expand All @@ -142,11 +159,12 @@ private[spark] class CoarseMesosSchedulerBackend(
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverUrl" +
s" --executor-id ${offer.getSlaveId.getValue}" +
s" --driver-url $driverURL" +
s" --executor-id $executorId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
Expand All @@ -155,6 +173,17 @@ private[spark] class CoarseMesosSchedulerBackend(
command.build()
}

protected def driverURL: String = {
if (conf.contains("spark.testing")) {
"driverURL"
} else {
sc.env.rpcEnv.uriOf(
SparkEnv.driverActorSystemName,
RpcAddress(conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt),
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
}
}

override def offerRescinded(d: SchedulerDriver, o: OfferID) {}

override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
Expand All @@ -172,17 +201,18 @@ private[spark] class CoarseMesosSchedulerBackend(
* unless we've already launched more than we wanted to.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
stateLock.synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val slaveId = offer.getSlaveId.getValue
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
if (meetsConstraints &&
if (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
Expand All @@ -197,7 +227,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val task = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", calculateTotalMemory(sc)))
Expand All @@ -209,7 +239,9 @@ private[spark] class CoarseMesosSchedulerBackend(

// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(List(offer.getId), List(task.build()), filters)
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(task.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
Expand All @@ -224,7 +256,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo("Mesos task " + taskId + " is now " + state)
synchronized {
stateLock.synchronized {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
Expand All @@ -242,8 +274,9 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
executorTerminated(d, slaveId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
mesosDriver.reviveOffers()
d.reviveOffers()
}
}
}
Expand All @@ -262,18 +295,39 @@ private[spark] class CoarseMesosSchedulerBackend(

override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
logInfo("Mesos slave lost: " + slaveId.getValue)
synchronized {
if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
// Note that the slave ID corresponds to the executor ID on that slave
slaveIdsWithExecutors -= slaveId.getValue
removeExecutor(slaveId.getValue, "Mesos slave lost")
/**
* Called when a slave is lost or a Mesos task finished. Update local view on
* what tasks are running and remove the terminated slave from the list of pending
* slave IDs that we might have asked to be killed. It also notifies the driver
* that an executor was removed.
*/
private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
stateLock.synchronized {
if (slaveIdsWithExecutors.contains(slaveId)) {
val slaveIdToTaskId = taskIdToSlaveId.inverse()
if (slaveIdToTaskId.contains(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason)
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
pendingRemovedSlaveIds -= slaveId
slaveIdsWithExecutors -= slaveId
}
}
}

override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
private def sparkExecutorId(slaveId: String, taskId: String): String = {
s"$slaveId/$taskId"
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo("Mesos slave lost: " + slaveId.getValue)
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}

override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
}
Expand All @@ -284,4 +338,34 @@ private[spark] class CoarseMesosSchedulerBackend(
super.applicationId
}

override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
executorLimitOption = Some(requestedTotal)
true
}

override def doKillExecutors(executorIds: Seq[String]): Boolean = {
if (mesosDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
return false
}

val slaveIdToTaskId = taskIdToSlaveId.inverse()
for (executorId <- executorIds) {
val slaveId = executorId.split("/")(0)
if (slaveIdToTaskId.contains(slaveId)) {
mesosDriver.killTask(
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
pendingRemovedSlaveIds += slaveId
} else {
logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
}
}
// no need to adjust `executorLimitOption` since the AllocationManager already communicated
// the desired limit through a call to `doRequestTotalExecutors`.
// See [[o.a.s.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors]]
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.util.control.NonFatal

import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler}
import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
import org.apache.mesos.Protos._
import org.apache.mesos.protobuf.GeneratedMessage
import org.apache.spark.{Logging, SparkContext}
Expand All @@ -39,7 +39,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
private final val registerLatch = new CountDownLatch(1)

// Driver for talking to Mesos
protected var mesosDriver: MesosSchedulerDriver = null
protected var mesosDriver: SchedulerDriver = null

/**
* Starts the MesosSchedulerDriver with the provided information. This method returns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,16 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
(blockId, getFile(blockId))
}

/**
* Create local directories for storing block data. These directories are
* located inside configured local directories and won't
* be deleted on JVM exit when using the external shuffle service.
*/
private def createLocalDirs(conf: SparkConf): Array[File] = {
Utils.getOrCreateLocalRootDirs(conf).flatMap { rootDir =>
Utils.getConfiguredLocalDirs(conf).flatMap { rootDir =>
try {
val localDir = Utils.createDirectory(rootDir, "blockmgr")
Utils.chmod700(localDir)
logInfo(s"Created local directory at $localDir")
Some(localDir)
} catch {
Expand Down
Loading

0 comments on commit c483059

Please sign in to comment.