Skip to content

Commit

Permalink
Fix ClusterScheduler bug to avoid allocating tasks to same slave
Browse files Browse the repository at this point in the history
  • Loading branch information
xiajunluan committed May 16, 2013
1 parent 8436bd5 commit c6e2770
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 50 deletions.
48 changes: 28 additions & 20 deletions core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -164,27 +164,35 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
// Build a list of tasks to assign to each slave
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
for (i <- 0 until offers.size) {
var launchedTask = true
val execId = offers(i).executorId
val host = offers(i).hostname
while (availableCpus(i) > 0 && launchedTask) {
var launchedTask = false
val sortedLeafSchedulable = rootPool.getSortedLeafSchedulable()
for (schedulable <- sortedLeafSchedulable)
{
logDebug("parentName:%s,name:%s,runningTasks:%s".format(schedulable.parent.name,schedulable.name,schedulable.runningTasks))
}
for (schedulable <- sortedLeafSchedulable) {
do {
launchedTask = false
rootPool.receiveOffer(execId,host,availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = task.taskSetId
taskSetTaskIds(task.taskSetId) += tid
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= 1
launchedTask = true

case None => {}
}
}
for (i <- 0 until offers.size) {
var launchedTask = true
val execId = offers(i).executorId
val host = offers(i).hostname
schedulable.slaveOffer(execId,host,availableCpus(i)) match {
case Some(task) =>
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = task.taskSetId
taskSetTaskIds(task.taskSetId) += tid
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= 1
launchedTask = true

case None => {}
}
}
} while(launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/scala/spark/scheduler/cluster/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,17 @@ private[spark] class Pool(
return shouldRevive
}

override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
return None
}

override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
var leafSchedulableQueue = new ArrayBuffer[Schedulable]
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
for (manager <- sortedSchedulableQueue) {
logInfo("parentName:%s,schedulableName:%s,minShares:%d,weight:%d,runningTasks:%d".format(
manager.parent.name, manager.name, manager.minShare, manager.weight, manager.runningTasks))
for (schedulable <- sortedSchedulableQueue) {
leafSchedulableQueue ++= schedulable.getSortedLeafSchedulable()
}
for (manager <- sortedSchedulableQueue) {
val task = manager.receiveOffer(execId, host, availableCpus)
if (task != None) {
return task
}
}
return None
return leafSchedulableQueue
}

override def increaseRunningTasks(taskNum: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ private[spark] trait Schedulable {
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String): Unit
def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription]
def checkSpeculatableTasks(): Boolean
def getSortedLeafSchedulable(): ArrayBuffer[Schedulable]
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private[spark] class TaskSetManager(
}

// Respond to an offer of a single slave from the scheduler by finding a task
override def receiveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
override def slaveOffer(execId: String, host: String, availableCpus: Double): Option[TaskDescription] = {
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val time = System.currentTimeMillis
val localOnly = (time - lastPreferredLaunchTime < LOCALITY_WAIT)
Expand Down Expand Up @@ -398,6 +398,12 @@ private[spark] class TaskSetManager(
//nothing
}

override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
var leafSchedulableQueue = new ArrayBuffer[Schedulable]
leafSchedulableQueue += this
return leafSchedulableQueue
}

override def executorLost(execId: String, hostname: String) {
logInfo("Re-queueing tasks for " + execId + " from TaskSet " + taskSet.id)
val newHostsAlive = sched.hostsAlive
Expand Down
46 changes: 29 additions & 17 deletions core/src/test/scala/spark/scheduler/ClusterSchedulerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import org.scalatest.BeforeAndAfter
import spark._
import spark.scheduler._
import spark.scheduler.cluster._
import scala.collection.mutable.ArrayBuffer

import java.util.Properties

Expand All @@ -25,45 +26,51 @@ class DummyTaskSetManager(
var numTasks = initNumTasks
var tasksFinished = 0

def increaseRunningTasks(taskNum: Int) {
override def increaseRunningTasks(taskNum: Int) {
runningTasks += taskNum
if (parent != null) {
parent.increaseRunningTasks(taskNum)
}
}

def decreaseRunningTasks(taskNum: Int) {
override def decreaseRunningTasks(taskNum: Int) {
runningTasks -= taskNum
if (parent != null) {
parent.decreaseRunningTasks(taskNum)
}
}

def addSchedulable(schedulable: Schedulable) {
override def addSchedulable(schedulable: Schedulable) {
}

def removeSchedulable(schedulable: Schedulable) {
override def removeSchedulable(schedulable: Schedulable) {
}

def getSchedulableByName(name: String): Schedulable = {
override def getSchedulableByName(name: String): Schedulable = {
return null
}

def executorLost(executorId: String, host: String): Unit = {
override def executorLost(executorId: String, host: String): Unit = {
}

def receiveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
override def slaveOffer(execId: String, host: String, avaiableCpus: Double): Option[TaskDescription] = {
if (tasksFinished + runningTasks < numTasks) {
increaseRunningTasks(1)
return Some(new TaskDescription(0, stageId.toString, execId, "task 0:0", null))
}
return None
}

def checkSpeculatableTasks(): Boolean = {
override def checkSpeculatableTasks(): Boolean = {
return true
}

override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
var leafSchedulableQueue = new ArrayBuffer[Schedulable]
leafSchedulableQueue += this
return leafSchedulableQueue
}

def taskFinished() {
decreaseRunningTasks(1)
tasksFinished +=1
Expand All @@ -80,16 +87,21 @@ class DummyTaskSetManager(

class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {

def receiveOffer(rootPool: Pool) : Option[TaskDescription] = {
rootPool.receiveOffer("execId_1", "hostname_1", 1)
def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedLeafSchedulable()
for (taskSet <- taskSetQueue)
{
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
case Some(task) =>
return task.taskSetId.toInt
case None => {}
}
}
-1
}

def checkTaskSetId(rootPool: Pool, expectedTaskSetId: Int) {
receiveOffer(rootPool) match {
case Some(task) =>
assert(task.taskSetId.toInt === expectedTaskSetId)
case _ =>
}
assert(resourceOffer(rootPool) === expectedTaskSetId)
}

test("FIFO Scheduler Test") {
Expand All @@ -105,9 +117,9 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
schedulableBuilder.addTaskSetManager(taskSetManager2, null)

checkTaskSetId(rootPool, 0)
receiveOffer(rootPool)
resourceOffer(rootPool)
checkTaskSetId(rootPool, 1)
receiveOffer(rootPool)
resourceOffer(rootPool)
taskSetManager1.abort()
checkTaskSetId(rootPool, 2)
}
Expand Down

0 comments on commit c6e2770

Please sign in to comment.