Skip to content

Commit

Permalink
[SPARK-11212][CORE][STREAMING] Make preferred locations support Execu…
Browse files Browse the repository at this point in the history
…torCacheTaskLocation and update…

… ReceiverTracker and ReceiverSchedulingPolicy to use it

This PR includes the following changes:

1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD.
2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host.

The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver.

So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this.

Author: zsxwing <[email protected]>

Closes #9181 from zsxwing/executor-location.
  • Loading branch information
zsxwing authored and tdas committed Oct 27, 2015
1 parent 4f030b9 commit 9fbd75a
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 132 deletions.
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ private[spark] sealed trait TaskLocation {
*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation
extends TaskLocation {
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}

/**
* A location on a host.
Expand All @@ -53,6 +55,9 @@ private[spark] object TaskLocation {
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"

// Identify locations of executors with this prefix.
val executorLocationTag = "executor_"

def apply(host: String, executorId: String): TaskLocation = {
new ExecutorCacheTaskLocation(host, executorId)
}
Expand All @@ -65,7 +70,15 @@ private[spark] object TaskLocation {
def apply(str: String): TaskLocation = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {
new HostTaskLocation(str)
if (str.startsWith(executorLocationTag)) {
val splits = str.split("_")
if (splits.length != 3) {
throw new IllegalArgumentException("Illegal executor location format: " + str)
}
new ExecutorCacheTaskLocation(splits(1), splits(2))
} else {
new HostTaskLocation(str)
}
} else {
new HDFSCacheTaskLocation(hstr)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Test TaskLocation for different host type.") {
assert(TaskLocation("host1") === HostTaskLocation("host1"))
assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
}

def createTaskResult(id: Int): DirectTaskResult[Int] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ private[streaming] class ReceiverSupervisorImpl(
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {

private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
private val host = SparkEnv.get.blockManager.blockManagerId.host
private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId

private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
Expand Down Expand Up @@ -179,7 +180,7 @@ private[streaming] class ReceiverSupervisorImpl(

override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,32 @@ package org.apache.spark.streaming.scheduler
import scala.collection.Map
import scala.collection.mutable

import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.util.Utils

/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
* scheduling receivers.
*
* - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
* all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
* It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
* receiverTrackingInfoMap according to the results of `scheduleReceivers`.
* `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
* contains the scheduled locations. Then when a receiver is starting, it will send a register
* request and `ReceiverTracker.registerReceiver` will be called. In
* `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
* if the location of this receiver is one of the scheduled executors, if not, the register will
* It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should
* update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
* `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
* that contains the scheduled locations. Then when a receiver is starting, it will send a
* register request and `ReceiverTracker.registerReceiver` will be called. In
* `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check
* if the location of this receiver is one of the scheduled locations, if not, the register will
* be rejected.
* - The second phase is local scheduling when a receiver is restarting. There are two cases of
* receiver restarting:
* - If a receiver is restarting because it's rejected due to the real location and the scheduled
* executors mismatching, in other words, it fails to start in one of the locations that
* locations mismatching, in other words, it fails to start in one of the locations that
* `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
* still alive in the list of scheduled executors, then use them to launch the receiver job.
* - If a receiver is restarting without a scheduled executors list, or the executors in the list
* still alive in the list of scheduled locations, then use them to launch the receiver job.
* - If a receiver is restarting without a scheduled locations list, or the executors in the list
* are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
* not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
* not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear
* it. Then when this receiver is registering, we can know this is a local scheduling, and
* `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
* location is matching.
Expand All @@ -69,9 +69,12 @@ private[streaming] class ReceiverSchedulingPolicy {
* </ol>
*
* This method is called when we start to launch receivers at the first time.
*
* @return a map for receivers and their scheduled locations
*/
def scheduleReceivers(
receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
receivers: Seq[Receiver[_]],
executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
if (receivers.isEmpty) {
return Map.empty
}
Expand All @@ -80,24 +83,24 @@ private[streaming] class ReceiverSchedulingPolicy {
return receivers.map(_.streamId -> Seq.empty).toMap
}

val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
val numReceiversOnExecutor = mutable.HashMap[String, Int]()
val hostToExecutors = executors.groupBy(_.host)
val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
// Set the initial value to 0
executors.foreach(e => numReceiversOnExecutor(e) = 0)

// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- 0 until receivers.length) {
// Note: preferredLocation is host but executors are host:port
// Note: preferredLocation is host but executors are host_executorId
receivers(i).preferredLocation.foreach { host =>
hostToExecutors.get(host) match {
case Some(executorsOnHost) =>
// preferredLocation is a known host. Select an executor that has the least receivers in
// this host
val leastScheduledExecutor =
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
scheduledExecutors(i) += leastScheduledExecutor
scheduledLocations(i) += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) =
numReceiversOnExecutor(leastScheduledExecutor) + 1
case None =>
Expand All @@ -106,40 +109,43 @@ private[streaming] class ReceiverSchedulingPolicy {
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
scheduledExecutors(i) += host

// Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
// this case
scheduledLocations(i) += TaskLocation(host)
}
}
}

// For those receivers that don't have preferredLocation, make sure we assign at least one
// executor to them.
for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
// Select the executor that has the least receivers
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
scheduledExecutorsForOneReceiver += leastScheduledExecutor
scheduledLocationsForOneReceiver += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
}

// Assign idle executors to receivers that have less executors
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
for (executor <- idleExecutors) {
// Assign an idle executor to the receiver that has least candidate executors.
val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
val leastScheduledExecutors = scheduledLocations.minBy(_.size)
leastScheduledExecutors += executor
}

receivers.map(_.streamId).zip(scheduledExecutors).toMap
receivers.map(_.streamId).zip(scheduledLocations).toMap
}

/**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
* Return a list of candidate locations to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
* <ol>
* <li>
* If preferredLocation is set, preferredLocation should be one of the candidate executors.
* If preferredLocation is set, preferredLocation should be one of the candidate locations.
* </li>
* <li>
* Every executor will be assigned to a weight according to the receivers running or
Expand All @@ -163,40 +169,58 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
executors: Seq[String]): Seq[String] = {
executors: Seq[ExecutorCacheTaskLocation]): Seq[TaskLocation] = {
if (executors.isEmpty) {
return Seq.empty
}

// Always try to schedule to the preferred locations
val scheduledExecutors = mutable.Set[String]()
scheduledExecutors ++= preferredLocation

val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
receiverTrackingInfo.state match {
case ReceiverState.INACTIVE => Nil
case ReceiverState.SCHEDULED =>
val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
// The probability that a scheduled receiver will run in an executor is
// 1.0 / scheduledLocations.size
scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
}
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
val scheduledLocations = mutable.Set[TaskLocation]()
// Note: preferredLocation could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to
// handle this case
scheduledLocations ++= preferredLocation.map(TaskLocation(_))

val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
}

val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.nonEmpty) {
scheduledExecutors ++= idleExecutors
scheduledLocations ++= idleExecutors
} else {
// There is no idle executor. So select all executors that have the minimum weight.
val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
if (sortedExecutors.nonEmpty) {
val minWeight = sortedExecutors(0)._2
scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
scheduledLocations ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
}
}
scheduledExecutors.toSeq
scheduledLocations.toSeq
}

/**
* This method tries to convert a receiver tracking info to executor weights. Every executor will
* be assigned to a weight according to the receivers running or scheduling on it:
*
* - If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
* - If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.
*/
private def convertReceiverTrackingInfoToExecutorWeights(
receiverTrackingInfo: ReceiverTrackingInfo): Seq[(ExecutorCacheTaskLocation, Double)] = {
receiverTrackingInfo.state match {
case ReceiverState.INACTIVE => Nil
case ReceiverState.SCHEDULED =>
val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
// The probability that a scheduled receiver will run in an executor is
// 1.0 / scheduledLocations.size
scheduledLocations.filter(_.isInstanceOf[ExecutorCacheTaskLocation]).map { location =>
location.asInstanceOf[ExecutorCacheTaskLocation] -> (1.0 / scheduledLocations.size)
}
case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
}
}
}
Loading

0 comments on commit 9fbd75a

Please sign in to comment.